AdisTS.py 11.7 KB
Newer Older
Youcef Aouad's avatar
Youcef Aouad committed
# AdisTS.py -- Pamhyr
# Copyright (C) 2023-2024  INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

# -*- coding: utf-8 -*-

import os
import logging
import numpy as np

from tools import timer, trace, logger_exception

from Solver.CommandLine import CommandLineSolver

from Model.Results.Results import Results
from Model.Results.River.River import River, Reach, Profile

Youcef Aouad's avatar
Youcef Aouad committed
from Checker.Adists import (
    AdistsOutputKpChecker,
)

Youcef Aouad's avatar
Youcef Aouad committed
logger = logging.getLogger()

def adists_file_open(filepath, mode):
    f = open(filepath, mode)

    if "w" in mode:
        # Write header
        comment = "*"
        if ".ST" in filepath:
            comment = "#"

        f.write(
            f"{comment} " +
            "This file is generated by PAMHYR, please don't modify\n"
        )

    return f

class AdisTS(CommandLineSolver):
    _type = "adists"

    def __init__(self, name):
        super(AdisTS, self).__init__(name)

        self._type = "adists"

        self._cmd_input = ""
        self._cmd_solver = "@path @input -o @output"
        self._cmd_output = ""

    @classmethod
    def default_parameters(cls):
        lst = super(AdisTS, cls).default_parameters()

        lst += [
            ("adists_implicitation_parameter", "0.5"),
            ("adists_timestep_screen", "60"),
            ("adists_timestep_bin", "60"),
            ("adists_timestep_csv", "60"),
            ("adists_timestep_mage", "60"),
            ("adists_initial_concentration", "60"),
        ]

        return lst

Youcef Aouad's avatar
Youcef Aouad committed
    @classmethod
    def checkers(cls):
        lst = [
            AdistsOutputKpChecker(),
        ]

        return lst

Youcef Aouad's avatar
Youcef Aouad committed
    def cmd_args(self, study):
        lst = super(AdisTS, self).cmd_args(study)
        return lst

    def input_param(self):
        name = self._study.name
        return f"{name}.REP"

Youcef Aouad's avatar
Youcef Aouad committed
    def log_file(self):
        name = self._study.name
        return f"{name}.TRA"

Youcef Aouad's avatar
Youcef Aouad committed
    def _export_REP_additional_lines(self, study, rep_file):
        lines = filter(
            lambda line: line.is_enabled(),
            study.river.rep_lines.lines
        )

        for line in lines:
            rep_file.write(line.line)

    def _export_REP(self, study, repertory, files, qlog, name="0"):
        if qlog is not None:
            qlog.put("Export REP file")

        # Write header
        with adists_file_open(
                os.path.join(
                    repertory, f"{name}.REP"
                ), "w+"
        ) as f:
            f.write("confirmation=non\n")

            for file in files:
                EXT = file.split('.')[1]
                f.write(f"{EXT} {file}\n")

            self._export_REP_additional_lines(study, f)

    @timer
    def export(self, study, repertory, qlog=None):
        self._study = study
        name = study.name.replace(" ", "_")

        self.export_additional_files(study, repertory, qlog, name=name)

        return True

Youcef Aouad's avatar
Youcef Aouad committed
    ###########
    # RESULTS #
    ###########

    def read_bin(self, study, repertory, results, qlog=None, name="0"):
        return

    @timer
    def results(self, study, repertory, qlog=None, name="0"):
        results = Results(
            study=study,
            solver=self,
            repertory=repertory,
            name=name,
        )
        self.read_bin(study, repertory, results, qlog, name=name)

        return results

Youcef Aouad's avatar
Youcef Aouad committed
################################
# Adis-TS in low coupling mode #
################################


class AdisTSlc(AdisTS):
    _type = "adistslc"

    def __init__(self, name):
        super(AdisTSlc, self).__init__(name)

        self._type = "adistslc"

    @classmethod
    def default_parameters(cls):
        lst = super(AdisTSlc, cls).default_parameters()

        # Insert new parameters at specific position
        names = list(map(lambda t: t[0], lst))

        return lst

    ##########
    # Export #
    ##########

    def cmd_args(self, study):
        lst = super(AdisTSlc, self).cmd_args(study)

        return lst

Youcef Aouad's avatar
Youcef Aouad committed
    def _export_POLs(self, study, repertory, qlog=None, name="0"):

        files = []

        if qlog is not None:
            qlog.put("Export POLS files")

        pollutants = study.river.Pollutants.Pollutants_List

        for pollutant in pollutants:
            name = pollutant.name
            with adists_file_open(os.path.join(repertory, f"{name}.POL"), "w+") as f:
                files.append(f"{name}.POL")
                f.write(f"*Polluant A contaminé aux PCB\n")
                f.write(f"name = {name}\n")

                self._export_POL_Characteristics(study, pollutant._data, f, qlog)

                POL_ICs = next(filter(lambda ic: ic.pollutant == pollutant.id,\
                                      study.river.initial_conditions_adists.Initial_Conditions_List))

                if POL_ICs.concentration != None:
                    self._export_ICs_AdisTS(study, repertory, POL_ICs, qlog, name)

        return files

    def _export_ICs_AdisTS(self, study, repertory, POL_IC_default, qlog, POL_name):

        if qlog is not None:
            qlog.put("Export POL ICs files")

        with adists_file_open(os.path.join(repertory, f"{POL_name}.INI"), "w+") as f:
            f.write(f"*État initial pour le polluant {POL_name}\n")
            f.write(f"{POL_IC_default.name} = {POL_IC_default.concentration} {POL_IC_default.eg} "+
            f"{POL_IC_default.em} {POL_IC_default.ed}\n")

    def _export_POL_Characteristics(self, study, pol_data, f, qlog, name="0"):

        list_characteristics = ["type", "diametre", "rho", "porosity", "cdc_riv", "cdc_cas", "apd", "ac", "bc"]

        if len(list_characteristics) == (len(pol_data[0])-1):
            for l in range(len(list_characteristics)):
                f.write(f"{list_characteristics[l]} = {pol_data[0][l]}\n")

    def _export_D90(self, study, repertory, qlog=None, name="0"):

        files = []

        if qlog is not None:
            qlog.put("Export D90 file")

        with adists_file_open(os.path.join(repertory, f"{name}.D90"), "w+") as f:
            files.append(f"{name}.D90")

            f.write(f"*Diamètres caractéristiques du fond stable\n")

            d90AdisTS = study.river.d90_adists.D90_AdisTS_List

            f.write(f"{d90AdisTS[0].name} = {d90AdisTS[0].d90}\n")

            self._export_d90_spec(study, d90AdisTS[0]._data, f, qlog)

        return files

    def _export_d90_spec(self, study, d90_spec_data, f, qlog, name="0"):

        for d90_spec in d90_spec_data:
            if (d90_spec.name is None) or (d90_spec.reach is None) or (d90_spec.start_kp is None) or \
                (d90_spec.end_kp is None) or (d90_spec.d90 is None):
                return

            edges = study.river.enable_edges()

            id_edges = list(map(lambda x: x.id, edges))

            id_reach = d90_spec.reach

            if id_reach not in id_edges:
                return

            f.write(f"{d90_spec.name} = {id_reach} {d90_spec.start_kp} {d90_spec.end_kp} {d90_spec.d90}\n")

    def _export_DIF(self, study, repertory, qlog=None, name="0"):

        files = []

        if qlog is not None:
            qlog.put("Export DIF file")

        with adists_file_open(os.path.join(repertory, f"{name}.DIF"), "w+") as f:
            files.append(f"{name}.DIF")

            f.write(f"*Définition des paramètres des fonctions de calcul du\n")
            f.write(f"*coefficient de diffusion\n")

            difAdisTS = study.river.dif_adists.DIF_AdisTS_List

            if difAdisTS[0].method != "generique":
                f.write(f"defaut = {difAdisTS[0].method} {difAdisTS[0].dif }\n")
            else:
                f.write(f"defaut = {difAdisTS[0].method} {difAdisTS[0].dif} {difAdisTS[0].b} {difAdisTS[0].c}\n")

            self._export_dif_spec(study, difAdisTS[0]._data, f, qlog)

        return files

    def _export_dif_spec(self, study, dif_spec_data, f, qlog, name="0"):

        for dif_spec in dif_spec_data:
            if (dif_spec.reach is None) or (dif_spec.start_kp is None) or \
                (dif_spec.end_kp is None) or (dif_spec.dif is None) or (dif_spec.b is None) or (dif_spec.c is None):
                return

            edges = study.river.enable_edges()

            id_edges = list(map(lambda x: x.id, edges))

            id_reach = dif_spec.reach

            if id_reach not in id_edges:
                return

            if dif_spec.method != "generique":
                f.write(f"{dif_spec.method} = {id_reach} {dif_spec.start_kp} {dif_spec.end_kp} {dif_spec.dif}\n")
            else:
                f.write(f"{dif_spec.method} = {id_reach} {dif_spec.start_kp} {dif_spec.end_kp} {dif_spec.dif}" +
                f"{dif_spec.b} {dif_spec.c}\n")

Youcef Aouad's avatar
Youcef Aouad committed
    def _export_NUM(self, study, repertory, qlog=None, name="0"):
Youcef Aouad's avatar
Youcef Aouad committed

        dict_names = {"init_time":"start_date",
                      "final_time":"end_date",
                      "timestep":"dt0",
                      "implicitation_parameter":"theta",
                      "timestep_screen":"dtscr",
                      "timestep_bin":"dtbin",
                      "timestep_csv":"dtcsv",
                      "timestep_mage":"dtMage",
                      "initial_concentration":"c_initiale"}
Youcef Aouad's avatar
Youcef Aouad committed
        files = []

        if qlog is not None:
            qlog.put("Export NUM file")

        with adists_file_open(os.path.join(repertory, f"{name}.NUM"), "w+") as f:
            files.append(f"{name}.NUM")

            params = study.river.get_params(self.type).parameters
            for p in params:
                name = p.name\
                        .replace("all_", "")\
                        .replace("adists_", "")
                value = p.value

                logger.debug(
                    f"export: NUM: {name}: {value} ({p.value})"
                )

Youcef Aouad's avatar
Youcef Aouad committed
                if name != "command_line_arguments":
                    f.write(f"{dict_names[name]} = {value}\n")
Youcef Aouad's avatar
Youcef Aouad committed

Youcef Aouad's avatar
Youcef Aouad committed
            outputkps = study.river.Output_kp_adists.OutputKp_List

            for outputkp in outputkps:
                self._export_outputkp(study, outputkp, f, qlog)

Youcef Aouad's avatar
Youcef Aouad committed
        return files

Youcef Aouad's avatar
Youcef Aouad committed
    def _export_outputkp(self, study, outputkp, f, qlog, name="0"):
        if (outputkp.reach is None) or (outputkp.kp is None) or (outputkp.title is None):
            return

Youcef Aouad's avatar
Youcef Aouad committed
        edges = study.river.enable_edges()
Youcef Aouad's avatar
Youcef Aouad committed

Youcef Aouad's avatar
Youcef Aouad committed
        id_edges = list(map(lambda x: x.id, edges))
Youcef Aouad's avatar
Youcef Aouad committed

Youcef Aouad's avatar
Youcef Aouad committed
        id_reach = outputkp.reach
        kp = outputkp.kp
        title = outputkp.title
Youcef Aouad's avatar
Youcef Aouad committed

Youcef Aouad's avatar
Youcef Aouad committed
        if id_reach not in id_edges:
            return

        f.write(f"output = {id_reach} {kp} {title}\n")
Youcef Aouad's avatar
Youcef Aouad committed

Youcef Aouad's avatar
Youcef Aouad committed
    def export_func_dict(self):
        return [
            self._export_NUM,
Youcef Aouad's avatar
Youcef Aouad committed
            self._export_DIF,
            self._export_D90,
            self._export_POLs,
Youcef Aouad's avatar
Youcef Aouad committed
        ]


    @timer
    def export(self, study, repertory, qlog=None, name="0"):
        self._study = study
        name = study.name.replace(" ", "_")

        # Generate files
        files = []

        try:
            for func in self.export_func_dict():
                files = files + func(study, repertory, qlog, name=name)

            self.export_additional_files(study, repertory, qlog, name=name)
            self._export_REP(study, repertory, files, qlog, name=name)

            return True
        except Exception as e:
            logger.error(f"Failed to export study to {self._type}")
            logger_exception(e)
            return False