AdisTS.py 5.24 KB
Newer Older
Youcef Aouad's avatar
Youcef Aouad committed
# AdisTS.py -- Pamhyr
# Copyright (C) 2023-2024  INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

# -*- coding: utf-8 -*-

import os
import logging
import numpy as np

from tools import timer, trace, logger_exception

from Solver.CommandLine import CommandLineSolver

from Model.Results.Results import Results
from Model.Results.River.River import River, Reach, Profile

logger = logging.getLogger()

def adists_file_open(filepath, mode):
    f = open(filepath, mode)

    if "w" in mode:
        # Write header
        comment = "*"
        if ".ST" in filepath:
            comment = "#"

        f.write(
            f"{comment} " +
            "This file is generated by PAMHYR, please don't modify\n"
        )

    return f

class AdisTS(CommandLineSolver):
    _type = "adists"

    def __init__(self, name):
        super(AdisTS, self).__init__(name)

        self._type = "adists"

        self._cmd_input = ""
        self._cmd_solver = "@path @input -o @output"
        self._cmd_output = ""

    @classmethod
    def default_parameters(cls):
        lst = super(AdisTS, cls).default_parameters()

        lst += [
            ("adists_implicitation_parameter", "0.5"),
            ("adists_timestep_screen", "60"),
            ("adists_timestep_bin", "60"),
            ("adists_timestep_csv", "60"),
            ("adists_timestep_mage", "60"),
            ("adists_initial_concentration", "60"),
            ("adists_output_points_csv", ""),
        ]

        return lst

    def cmd_args(self, study):
        lst = super(AdisTS, self).cmd_args(study)
        return lst

    def input_param(self):
        name = self._study.name
        return f"{name}.REP"

    def _export_REP_additional_lines(self, study, rep_file):
        lines = filter(
            lambda line: line.is_enabled(),
            study.river.rep_lines.lines
        )

        for line in lines:
            rep_file.write(line.line)

    def _export_REP(self, study, repertory, files, qlog, name="0"):
        if qlog is not None:
            qlog.put("Export REP file")

        # Write header
        with adists_file_open(
                os.path.join(
                    repertory, f"{name}.REP"
                ), "w+"
        ) as f:
            f.write("confirmation=non\n")

            for file in files:
                EXT = file.split('.')[1]
                f.write(f"{EXT} {file}\n")

            self._export_REP_additional_lines(study, f)

    @timer
    def export(self, study, repertory, qlog=None):
        self._study = study
        name = study.name.replace(" ", "_")

        self.export_additional_files(study, repertory, qlog, name=name)

        return True

################################
# Adis-TS in low coupling mode #
################################


class AdisTSlc(AdisTS):
    _type = "adistslc"

    def __init__(self, name):
        super(AdisTSlc, self).__init__(name)

        self._type = "adistslc"

    @classmethod
    def default_parameters(cls):
        lst = super(AdisTSlc, cls).default_parameters()

        # Insert new parameters at specific position
        names = list(map(lambda t: t[0], lst))

        return lst

    ##########
    # Export #
    ##########

    def cmd_args(self, study):
        lst = super(AdisTSlc, self).cmd_args(study)

        return lst

    def _export_NUM(self, study, repertory, qlog=None, name="0"):
        files = []

        if qlog is not None:
            qlog.put("Export NUM file")

        with adists_file_open(os.path.join(repertory, f"{name}.NUM"), "w+") as f:
            files.append(f"{name}.NUM")

            params = study.river.get_params(self.type).parameters
            for p in params:
                name = p.name\
                        .replace("all_", "")\
                        .replace("adists_", "")
                value = p.value

                logger.debug(
                    f"export: NUM: {name}: {value} ({p.value})"
                )

                f.write(f"{name} {value}\n")

        return files

    def export_func_dict(self):
        return [
            self._export_NUM,
        ]


    @timer
    def export(self, study, repertory, qlog=None, name="0"):
        self._study = study
        name = study.name.replace(" ", "_")

        # Generate files
        files = []

        try:
            for func in self.export_func_dict():
                files = files + func(study, repertory, qlog, name=name)

            self.export_additional_files(study, repertory, qlog, name=name)
            self._export_REP(study, repertory, files, qlog, name=name)

            return True
        except Exception as e:
            logger.error(f"Failed to export study to {self._type}")
            logger_exception(e)
            return False