Mage.py 12.3 KB
Newer Older
import os

from tools import timer

from Solver.ASolver import AbstractSolver
from Checker.Mage import MageNetworkGraphChecker
class Mage(AbstractSolver):
    def __init__(self, name):
        super(Mage, self).__init__(name)

        self._type = "mage"

        self._cmd_input = ""
        self._cmd_solver = "@path @input -o @output"
        self._cmd_output = ""

    @classmethod
    def default_parameters(cls):
        lst = super(Mage, cls).default_parameters()

        lst += [
            ("mage_min_timestep", "1.0"),
            ("mage_timestep_tra", "3600"),
            ("mage_timestep_bin", "0"),
            # ("mage_timestep_melissa", "0"),
            ("mage_implication", "0.70"),
            ("mage_continuity_discretization", "S"),
            ("mage_qsj_discretization", "B"),
            ("mage_stop_criterion_iterations", "R"),
            ("mage_iter_type", "0"),
            ("mage_smooth_coef", "0"),
            ("mage_cfl_max", "-1."),
            ("mage_min_height", "0.1"),
            ("mage_max_niter", "10"),
            ("mage_timestep_reduction_factor", "2"),
            ("mage_precision_reduction_factor_Z", "1"),
            ("mage_precision_reduction_factor_Q", "1"),
            ("mage_niter_max_precision", "99"),
            ("mage_niter_before_switch", "99"),
            ("mage_max_froude", "1.5"),
            ("mage_diffluence_node_height_balance", "-1"),
            ("mage_compute_reach_volume_balance", "y"),
            ("mage_max_reach_volume_balance", "0.001"),
            ("mage_min_reach_volume_to_check", "1000.0"),
    @classmethod
    def checkers(cls):
        lst = [
            MageNetworkGraphChecker(connectivity = True),
            MageNetworkGraphChecker(connectivity = False)
        ]

        return lst

    ##########
    # Export #
    ##########

    def input_param(self):
        return "0.REP"

    def log_file(self):
        return "0.TRA"

    def _export_ST(self, study, repertory, qlog):
        files = []

        if qlog is not None:
            qlog.put("Export ST file")

        # Write header
        edges = study.river.edges()
        edges = list(
            filter(
                lambda e: e.is_enable(),
                edges
            )
        )

        for edge in edges:
            name = edge.name.replace(" ", "_")
            if edge._name == "":
                name = f"Reach_{edge.id}"

            with open(os.path.join(repertory, f"{name}.ST"), "w+") as f:
                files.append(f"{name}.ST")
                f.write("* This file is generate by PAMHYR, please don't modify\n")

                for profile in edge.reach.profiles:
                    num = f"{profile.num:>6}"
                    c1 = f"{profile.code1:>6}"
                    c2 = f"{profile.code2:>6}"
                    t = f"{len(profile.points):>6}"
                    kp = f"{profile.kp:>13.4f}"
                    name = profile.name

                    f.write(f"{num}{c1}{c2}{t}{kp}  {name}\n")

                    for point in profile.points:
                        x = f"{point.x:>13.4f}"
                        y = f"{point.y:>13.4f}"
                        z = f"{point.z:>13.4f}"
                        n = point.name

                        f.write(f"{x}{y}{z} {n}\n")

                    f.write(f"     999.9990     999.9990     999.9990\n")

    def _export_BC(self, t, bounds, repertory, qlog):
        if len(bounds) == 0:
            return files

        if qlog is not None:
            qlog.put(f"Export {t} file")

        with open(os.path.join(repertory, f"0.{t}"), "w+") as f:
            files.append(f"0.{t}")

            f.write("* This file is generate by PAMHYR, please don't modify\n")

            for bound in bounds:
                name = f"{bound.node.id:3}".replace(" ", "x")
                f.write(f"* {bound.node.name} ({name}) {bound.bctype}\n")
                f.write(f"${name}\n")
                header = bound.header
                f.write(f"*{header[0]:>9}|{header[1]:>10}\n")
                for d in bound.data:
                    f.write(f"{d[0]:10.3f}{d[1]:10.3f}\n")
        return files
    def _export_bound_cond(self, study, repertory, qlog):
        files = []
        lst = study.river.boundary_condition
        for tab in ["liquid", "solid", "suspenssion"]:
            for bound in lst.get_tab(tab):
                if bound.bctype == "ZD":
                    AVA.append(bound)
                elif bound.bctype == "TD" or bound.bctype == "PC":
                    HYD.append(bound)
                elif bound.bctype == "TZ":
                    LIM.append(bound)

        files = files + self._export_BC("AVA", AVA, repertory,  qlog)
        files = files + self._export_BC("HYD", HYD, repertory,  qlog)
        files = files + self._export_BC("LIM", LIM, repertory,  qlog)
    # @timer
    # def _export_LC(self, lateral, repertory, qlog):
    #     files = []

    #     if qlog is not None:
    #         qlog.put(f"Export LAT file")

    #     with open(os.path.join(repertory, f"0.LAT"), "w+") as f:
    #         files.append(f"0.LAT")

    #         f.write("* This file is generate by PAMHYR, please don't modify\n")

    #         name = f"{lateral.node.id:3}".replace(" ", "x")
    #         f.write(f"* {lateral.node.name} ({name}) {lateral.bctype}\n")
    #         f.write(f"${name}\n")
    #         header = lateral.header
    #         f.write(f"*{header[0]:>9}|{header[1]:>10}\n")

    #         for d in lateral.data:
    #             f.write(f"{d[0]:10.3f}{d[1]:10.3f}\n")

    #     return files

    # @timer
    # def _export_lateral_contrib(self, study, repertory, qlog):
    #     files = []
    #     lst = study.river.lateral_contribution

    #     for tab in ["liquid", "solid", "suspenssion"]:
    #         for lateral in lst.get_tab(tab):
    #             files = files + self._export_LC(lateral, repertory,  qlog)

    #     return files

    def _export_RUG(self, study, repertory, qlog):
        files = []

        if qlog is not None:
            qlog.put("Export RUG file")

        # Write header
        with open(os.path.join(repertory, "0.RUG"), "w+") as f:
            files.append("0.RUG")
            f.write("* This file is generate by PAMHYR, please don't modify\n")

            edges = study.river.edges()
            edges = list(
                filter(
                    lambda e: e.is_enable(),
                    edges
                )
            )

            id = 1
            for edge in edges:
                frictions = edge.frictions
                for friction in frictions.frictions:
                    num = f"{id:>3}"
                    bkp = f"{friction.begin_kp:>10.3f}"
                    ekp = f"{friction.end_kp:>10.3f}"
                    # if friction.begin_kp != friction.end_kp:
                    strickler = friction.begin_strickler
                    coef_1 = f"{strickler.minor:>10.3f}"
                    coef_2 = f"{strickler.medium:>10.3f}"

                    f.write(f"K{num}      {bkp}{ekp}{coef_1}{coef_2}\n")
        return files

    @timer
    def _export_INI(self, study, repertory, qlog):
        files = []

        if qlog is not None:
            qlog.put("Export INI file")

        # Write header
        with open(os.path.join(repertory, "0.INI"), "w+") as f:
            reachs = list(
                filter(
                    lambda e: e.is_enable(),
                    reachs
                )
            )

            f.write("* This file is generate by PAMHYR, please don't modify\n")
            # TODO put real date...
            f.write(f"$ date en minutes :       0.00\n")
            f.write(f"* IB IS    discharge  elevation         kp\n")

            for reach in reachs:
                cond = study.river.initial_conditions.get(reach)
                if len(data) == 0:
                    continue

                has_ini = True
                for d in data:
                    IR = f"{id:>3}"
                    IS = f"{id_sec:>3}"
                    discharge = f"{d['discharge']:>10.5f}"
                    elevation = f"{d['elevation']:>11.6f}"
                    kp = f"{d['kp']:>9.2f}"

                    f.write(f" {IR} {IS}   {discharge}{elevation} {kp}\n")
        if has_ini:
            files.append("0.INI")
    def _export_REP(self, study, repertory, files, qlog):
        if qlog is not None:
            qlog.put("Export REP file")

        # Write header
        with open(os.path.join(repertory, f"0.REP"), "w+") as f:
            f.write("confirmation=non\n")
            f.write("* This file is generate by PAMHYR, please don't modify\n")

            for file in files:
                EXT = file.split('.')[1]

                f.write(f"{EXT} {file}\n")

            f.write("* OUTPUT\n")
            f.write(f"TRA 0.TRA\n")
            f.write(f"BIN 0.BIN\n")

    @timer
    def export(self, study, repertory, qlog = None):
        self._export_ST(study, repertory, qlog)

        return True

##########
# MAGE 7 #
##########
    def __init__(self, name):
        super(Mage7, self).__init__(name)

        self._type = "mage7"

    @classmethod
    def default_parameters(cls):
        lst = super(Mage7, cls).default_parameters()

##########
# MAGE 8 #
##########
    def __init__(self, name):
        super(Mage8, self).__init__(name)

        self._type = "mage8"

    @classmethod
    def default_parameters(cls):
        lst = super(Mage8, cls).default_parameters()

        # Insert new parameters at specific position
        names = list(map(lambda t: t[0], lst))
        i = names.index("mage_precision_reduction_factor_Q")
        lst.insert(i+1, ("mage_precision_reduction_factor_r", "1"))


    ##########
    # Export #
    ##########

    @timer
    def _export_PAR(self, study, repertory, qlog = None):
        if qlog is not None:
            qlog.put("Export PAR file")

        with open(os.path.join(repertory, "0.PAR"), "w+") as f:
            files.append("0.PAR")

            f.write("* This file is generate by PAMHYR, please don't modify\n")

            params = study.river.get_params(self.type).parameters
            for p in params:
                name = p.name\
                        .replace("all_", "")\
                        .replace("mage_", "")
                value = p.value

                f.write(f"{name} {value}\n")


    @timer
    def _export_NET(self, study, repertory, qlog = None):
        if qlog is not None:
            qlog.put("Export NET file")

        with open(os.path.join(repertory, "0.NET"), "w+") as f:
            files.append("0.NET")

            f.write("* This file is generate by PAMHYR, please don't modify\n")

            edges = study.river.edges()
            edges = list(
                filter(
                    lambda e: e.is_enable(),
                    edges
                )
            )

            for e in edges:
                name = e.name.replace(" ", "_")
                if e._name == "":
                    name = f"Reach_{e.id}"

                n1 = f"{e.node1.id:3}".replace(" ", "x")
                n2 = f"{e.node2.id:3}".replace(" ", "x")
                file = name + ".ST"
                f.write(f"{id} {n1} {n2} {file}\n")
    @timer
    def export(self, study, repertory, qlog = None):
        self._export_ST(study, repertory, qlog)
        files = files + self._export_PAR(study, repertory, qlog)
        files = files + self._export_NET(study, repertory, qlog)
        files = files + self._export_bound_cond(study, repertory, qlog)
        files = files + self._export_RUG(study, repertory, qlog)
        files = files + self._export_INI(study, repertory, qlog)
        self._export_REP(study, repertory, files, qlog)