# CommandLine.py -- Pamhyr # Copyright (C) 2023-2024 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # -*- coding: utf-8 -*- import os import logging from datetime import datetime from tools import timer, parse_command_line, get_version, logger_exception try: # Installation allow Unix-like signal from signal import SIGTERM, SIGSTOP, SIGCONT _signal = True except Exception: _signal = False from enum import Enum from Model.Except import NotImplementedMethodeError from Model.Results.Results import Results from Model.Results.River.River import River, Reach, Profile from Solver.ASolver import AbstractSolver, STATUS logger = logging.getLogger() class CommandLineSolver(AbstractSolver): _type = "" def __init__(self, name): super(CommandLineSolver, self).__init__(name) self._current_process = None self._status = STATUS.NOT_LAUNCHED self._path_input = "" self._path_solver = "" self._path_output = "" self._cmd_input = "" self._cmd_solver = "" self._cmd_output = "" self._process = None self._output = None # Last study running self._study = None @classmethod def default_parameters(cls): lst = super(CommandLineSolver, cls).default_parameters() lst += [ ("all_command_line_arguments", ""), ] return lst def set_input(self, path, cmd): self._path_input = path self._cmd_input = cmd def set_solver(self, path, cmd): self._path_solver = path self._cmd_solver = cmd def set_output(self, path, cmd): self._path_output = path self._cmd_output = cmd ########## # Export # ########## def cmd_args(self, study): """Return solver command line arguments list Returns: Command line arguments list """ params = study.river.get_params(self.type) args = params.get_by_key("all_command_line_arguments") return args.split(" ") def input_param(self): """Return input command line parameter(s) Returns: Returns input parameter(s) string """ raise NotImplementedMethodeError(self, self.input_param) def output_param(self): """Return output command line parameter(s) Returns: Returns output parameter(s) string """ raise NotImplementedMethodeError(self, self.output_param) def log_file(self): """Return log file name Returns: Returns log file name as string """ raise NotImplementedMethodeError(self, self.log_file) def export_additional_files(self, study, repertory, qlog, name="0"): files = [] if qlog is not None: qlog.put("Export additional files") add_files = study.river.additional_files.files for add_file in add_files: self.export_additional_file( study, add_file, repertory, files ) return files def export_additional_file(self, study, add_file, repertory, files): if add_file.path == "" or not add_file.is_enabled(): return files path = os.path.join(repertory, add_file.path) os.makedirs( os.path.dirname(path), exist_ok=True ) with open(path, "w+") as f: files.append(add_file.path) txt = add_file.text txt = txt.replace("@version", get_version()) txt = txt.replace("@date", datetime.now().isoformat(sep=" ")) f.write(txt) return files def export_study_description(self, study, repertory, qlog, name="0"): files = [] path = os.path.join( repertory, "pamhyr-study-description.txt" ) with open(path, "w+", encoding='utf-8') as f: txt = study.description\ .encode()\ .decode('utf-8', 'replace') f.write(txt) ####### # Run # ####### def _install_dir(self): return os.path.abspath( os.path.join( os.path.dirname(__file__), "..", ".." ) ) def _format_command(self, study, cmd, path=""): """Format command line Args: cmd: The command line path: Optional path string (replace @path in cmd) Returns: The executable and list of arguments """ cmd = cmd.replace("@install_dir", self._install_dir()) cmd = cmd.replace("@path", "\"" + path + "\"") cmd = cmd.replace("@input", self.input_param().replace(" ", "_")) cmd = cmd.replace("@output", self.output_param()) cmd = cmd.replace("@dir", self._process.workingDirectory()) cmd = cmd.replace("@args", " ".join(self.cmd_args(study))) logger.debug(f"! {cmd}") words = parse_command_line(cmd) exe = words[0] args = words[1:] logger.info(f"! {exe} {args}") return exe, args def run_input_data_fomater(self, study): if self._cmd_input == "": self._run_next(study) return True cmd = self._cmd_input exe, args = self._format_command(study, cmd, self._path_input) if not os.path.exists(exe): error = f"[ERROR] Path {exe} do not exists" logger.warning(error) return error self._process.start( exe, args, ) self._process.waitForStarted() return True def run_solver(self, study): if self._cmd_solver == "": self._run_next(study) return True cmd = self._cmd_solver exe, args = self._format_command(study, cmd, self._path_solver) print("parsed cmd solver exe : ", exe) print("parsed cmd solver args : ", args) if not os.path.exists(exe): error = f"[ERROR] Path {exe} do not exists" logger.warning(error) return error self._process.start( exe, args, ) self._process.waitForStarted() self._status = STATUS.RUNNING return True def run_output_data_fomater(self, study): if self._cmd_output == "": self._run_next(study) return True cmd = self._cmd_output exe, args = self._format_command(study, cmd, self._path_output) if not os.path.exists(exe): error = f"[ERROR] Path {exe} do not exists" logger.warning(error) return error self._process.start( exe, args, ) self._process.waitForStarted() return True def _data_ready(self): try: # Read process output and put lines in queue s = self._process.readAll().data().decode('utf-8', "replace") if self._output is not None: for x in s.split(os.linesep): self._output.put(x) except Exception as e: logger_exception(e) def _finished(self, study, exit_code, exit_status): if self._output is not None: self._output.put(exit_code) logger.debug( "Process finished with " + f"code: {exit_code}, status: {exit_status}" ) self._run_next(study) def _run_next(self, study): self._step += 1 if self._step >= len(self._runs): self._status = STATUS.STOPED return fn = self._runs[self._step] res = fn(study) if res is not True: self._output.put(res) def run(self, study, process=None, output_queue=None): print("Run debug") print("---------") print("cmd input : ", self._cmd_input) print("cmd solver : ", self._cmd_solver) print("cmd output : ", self._cmd_output) self._study = study # Replace old values if needed if process is not None: self._process = process if output_queue is not None: self._output = output_queue # Connect / reconnect signal self._process.readyRead.connect(self._data_ready) self._process.finished.connect( lambda c, s: self._finished(study, c, s) ) # Prepare running step self._runs = [ self.run_input_data_fomater, self.run_solver, self.run_output_data_fomater, ] self._step = 0 self._status = STATUS.RUNNING # Run first step res = self._runs[0](study) if res is not True: self._output.put(res) def kill(self): if self._process is None: return True self._process.kill() self._status = STATUS.STOPED return True def start(self, study, process=None): if _signal: # Solver is PAUSED, so continue execution if self._status == STATUS.PAUSED: os.kill(self._process.pid(), SIGCONT) self._status = STATUS.RUNNING return True self.run(study, process) return True def pause(self): if _signal: if self._process is None: return False # Send SIGSTOP to PAUSED solver os.kill(self._process.pid(), SIGSTOP) self._status = STATUS.PAUSED return True return False def stop(self): if self._process is None: return False self._process.terminate() self._status = STATUS.STOPED return True