# AdisTS.py -- Pamhyr # Copyright (C) 2023-2024 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # -*- coding: utf-8 -*- import os import logging import numpy as np from tools import timer, trace, logger_exception from Solver.CommandLine import CommandLineSolver from Model.Results.Results import Results from Model.Results.River.River import River, Reach, Profile logger = logging.getLogger() def adists_file_open(filepath, mode): f = open(filepath, mode) if "w" in mode: # Write header comment = "*" if ".ST" in filepath: comment = "#" f.write( f"{comment} " + "This file is generated by PAMHYR, please don't modify\n" ) return f class AdisTS(CommandLineSolver): _type = "adists" def __init__(self, name): super(AdisTS, self).__init__(name) self._type = "adists" self._cmd_input = "" self._cmd_solver = "@path @input -o @output" self._cmd_output = "" @classmethod def default_parameters(cls): lst = super(AdisTS, cls).default_parameters() lst += [ ("adists_implicitation_parameter", "0.5"), ("adists_timestep_screen", "60"), ("adists_timestep_bin", "60"), ("adists_timestep_csv", "60"), ("adists_timestep_mage", "60"), ("adists_initial_concentration", "60"), ("adists_output_points_csv", ""), ] return lst def cmd_args(self, study): lst = super(AdisTS, self).cmd_args(study) return lst def input_param(self): name = self._study.name return f"{name}.REP" def _export_REP_additional_lines(self, study, rep_file): lines = filter( lambda line: line.is_enabled(), study.river.rep_lines.lines ) for line in lines: rep_file.write(line.line) def _export_REP(self, study, repertory, files, qlog, name="0"): if qlog is not None: qlog.put("Export REP file") # Write header with adists_file_open( os.path.join( repertory, f"{name}.REP" ), "w+" ) as f: f.write("confirmation=non\n") for file in files: EXT = file.split('.')[1] f.write(f"{EXT} {file}\n") self._export_REP_additional_lines(study, f) @timer def export(self, study, repertory, qlog=None): self._study = study name = study.name.replace(" ", "_") self.export_additional_files(study, repertory, qlog, name=name) return True ################################ # Adis-TS in low coupling mode # ################################ class AdisTSlc(AdisTS): _type = "adistslc" def __init__(self, name): super(AdisTSlc, self).__init__(name) self._type = "adistslc" @classmethod def default_parameters(cls): lst = super(AdisTSlc, cls).default_parameters() # Insert new parameters at specific position names = list(map(lambda t: t[0], lst)) return lst ########## # Export # ########## def cmd_args(self, study): lst = super(AdisTSlc, self).cmd_args(study) return lst def _export_NUM(self, study, repertory, qlog=None, name="0"): files = [] if qlog is not None: qlog.put("Export NUM file") with adists_file_open(os.path.join(repertory, f"{name}.NUM"), "w+") as f: files.append(f"{name}.NUM") params = study.river.get_params(self.type).parameters for p in params: name = p.name\ .replace("all_", "")\ .replace("adists_", "") value = p.value logger.debug( f"export: NUM: {name}: {value} ({p.value})" ) f.write(f"{name} {value}\n") return files def export_func_dict(self): return [ self._export_NUM, ] @timer def export(self, study, repertory, qlog=None, name="0"): self._study = study name = study.name.replace(" ", "_") # Generate files files = [] try: for func in self.export_func_dict(): files = files + func(study, repertory, qlog, name=name) self.export_additional_files(study, repertory, qlog, name=name) self._export_REP(study, repertory, files, qlog, name=name) return True except Exception as e: logger.error(f"Failed to export study to {self._type}") logger_exception(e) return False