Mage.py 9.81 KiB
# Mage.py -- Pamhyr
# Copyright (C) 2023  INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

# -*- coding: utf-8 -*-

import os
import logging
import tempfile

from ctypes import (
    cdll,
    byref, Structure,
    c_char_p, c_wchar_p,
    create_string_buffer,
    POINTER, c_void_p,
    c_int, c_double, c_bool
)
from PyQt5.QtCore import QProcess

from tools import logger_color_red, logger_color_reset
from Meshing.AMeshingTool import AMeshingTool

logger = logging.getLogger()


class MeshingWithMage(AMeshingTool):
    def __init__(self):
        super(MeshingWithMage, self).__init__()

        self._init_bief_lib()

    def _init_bief_lib(self):
        self._bief_lib = cdll.LoadLibrary(self._lib_path())

        self._init_c_init_bief_from_geo_file()
        self._init_c_get_nb_sections()
        self._init_c_get_nb_points_section()
        self._init_c_set_bief_name()
        self._init_c_st_to_m_compl()
        self._init_c_interpolate_profils_pas_transversal()
        self._init_c_purge()
        self._init_c_output_bief()

    @classmethod
    def _lib_path(cls):
        ext = "so" if os.name == "posix" else "dll"

        return os.path.abspath(
            os.path.join(
                os.path.dirname(__file__),
                "..", "..", "..", "mage", f"libbief.{ext}"
            )
        )

    def _init_c_init_bief_from_geo_file(self):
        self._c_init_bief_from_geo_file = getattr(
            self._bief_lib, 'c_init_bief_from_geo_file'
        )
        self._c_init_bief_from_geo_file.argtypes = [
            c_char_p, POINTER(c_int), POINTER(c_int)
        ]
        self._c_init_bief_from_geo_file.restype = None

    def _init_c_get_nb_sections(self):
        self._c_get_nb_sections = getattr(self._bief_lib, 'c_get_nb_sections')
        self._c_get_nb_sections.argtypes = [POINTER(c_int)]
        self._c_get_nb_sections.restype = None

    def _init_c_get_nb_points_section(self):
        self._c_get_nb_points_section = getattr(
            self._bief_lib, 'c_get_nb_points_section'
        )
        self._c_get_nb_points_section.argtypes = [
            POINTER(c_int), POINTER(c_int)
        ]
        self._c_get_nb_points_section.restype = None

    def _init_c_set_bief_name(self):
        self._c_set_bief_name = getattr(self._bief_lib, 'c_set_bief_name')
        self._c_set_bief_name.argtypes = [c_char_p]
        self._c_set_bief_name.restype = None

    def _init_c_st_to_m_compl(self):
        self._c_st_to_m_compl = getattr(self._bief_lib, 'c_st_to_m_compl')
        self._c_st_to_m_compl.argtypes = [POINTER(c_int), c_char_p, c_char_p]
        self._c_st_to_m_compl.restype = None

    def _init_c_interpolate_profils_pas_transversal(self):
        self._c_interpolate_profils_pas_transversal = getattr(
            self._bief_lib, 'c_interpolate_profils_pas_transversal'
        )
        self._c_interpolate_profils_pas_transversal.argtypes = [
            POINTER(c_int), POINTER(c_int),
            c_char_p, c_char_p,
            POINTER(c_double), POINTER(c_bool),
            POINTER(c_int), POINTER(c_bool)
        ]
        self._c_interpolate_profils_pas_transversal.restype = None

    def _init_c_purge(self):
        self._c_purge = getattr(self._bief_lib, 'c_purge')
        self._c_purge.argtypes = None
        self._c_purge.restype = None

    def _init_c_output_bief(self):
        self._c_output_bief = getattr(self._bief_lib, 'c_output_bief')
        self._c_output_bief.argtypes = [c_char_p]
        self._c_output_bief.restype = None

    #####################
    # Binding functions #
    #####################

    def init_bief_from_geo_file(self, name, with_charriage, with_water):
        cname = create_string_buffer(name.encode())
        self._c_init_bief_from_geo_file(
            cname,
            byref(c_int(with_charriage)),
            byref(c_int(with_water))
        )

    def get_nb_sections(self):
        nb_sections = c_int(0)
        self._c_get_nb_sections(byref(nb_sections))
        return nb_sections.value

    def get_nb_points_section(self, section):
        nb_points = c_int(0)
        self._c_get_nb_points_section(
            byref(c_int(section)), byref(nb_points)
        )
        return nb_points.value

    def set_bief_name(self, name):
        cname = create_string_buffer(name.encode())
        self._c_set_bief_name(cname)

    def st_to_m_compl(self, npoints, tag1='   ', tag2='   '):
        cnpoints = c_int(npoints)
        ctag1 = create_string_buffer(tag1.encode())
        ctag2 = create_string_buffer(tag2.encode())

        self._c_st_to_m_compl(byref(cnpoints), ctag1, ctag2)

    def interpolate_profils_pas_transversal(self, limite1, limite2,
                                            directrice1, directrice2,
                                            pas, lplan=False,
                                            lm=3, lineaire=False):
        climite1 = c_int(limite1)
        climite2 = c_int(limite2)
        cpas = c_double(pas)
        clplan = c_bool(lplan)
        clm = c_int(lm)
        clineaire = c_bool(lineaire)
        cdirectrice1 = create_string_buffer(directrice1.encode())
        cdirectrice2 = create_string_buffer(directrice2.encode())

        self._c_interpolate_profils_pas_transversal(
            byref(climite1), byref(climite2),
            cdirectrice1, cdirectrice2,
            byref(cpas), byref(clplan),
            byref(clm), byref(clineaire)
        )

    def output_bief(self, name):
        cname = create_string_buffer(name.encode())
        self._c_output_bief(cname)

    ###########
    # Meshing #
    ###########

    def meshing(self, reach, step: float = 50):
        if reach is None or len(reach.profiles) == 0:
            return reach

        with tempfile.TemporaryDirectory() as tmp:
            st_file = self.export_reach_to_st(reach, tmp)
            m_file = st_file.rsplit(".ST", 1)[0] + ".M"

            self.load_st_file(st_file)

            ns, npts_max = self.get_reach_stat()

            gl = reach.compute_guidelines()

            # we make sure that the lines are in the left-to-right order
            guide_list = [
                x.name
                for x in reach.profiles[0].named_points()
                if x.name in gl[0]
            ]

            self.complete_cross_section(guide_list)
            self.interpolate_cross_section(ns, step)
            self.purge()

            self.export_to_m(m_file)

            self.import_m_file(reach, m_file)
            return reach

    def export_reach_to_st(self, reach, tmp):
        tmp_st = os.path.join(tmp, "meshing.ST")

        logger.debug(f"meshing: Export ST to {tmp_st}")

        reach.export_reach(tmp_st)
        return tmp_st

    def load_st_file(self, st):
        self.init_bief_from_geo_file(st, 0, 0)
        self.set_bief_name("tmp")

    def get_reach_stat(self):
        ns = self.get_nb_sections()
        npts_max = max(
            map(
                lambda i: self.get_nb_points_section(i),
                range(1, ns)
            )
        )

        return ns, npts_max

    def complete_cross_section(self, gl=[]):
        gl1 = ["un"] + gl
        gl2 = gl + ["np"]

        for gls in zip(gl1, gl2):
            self.st_to_m_compl(0, gls[0], gls[1])

    def interpolate_cross_section(self, ns, step: float):
        self.interpolate_profils_pas_transversal(
            1, ns,
            'un', 'np',
            step
        )

    def purge(self):
        self._c_purge()

    def export_to_m(self, m):
        self.output_bief(m)

    def import_m_file(self, reach, m):
        reach.purge()

        logger.debug(f"meshing: Import geometry from {m}")
        reach.import_geometry(m)


class MeshingWithMageMailleurTT(AMeshingTool):
    def __init__(self):
        super(MeshingWithMageMailleurTT, self).__init__()

    @classmethod
    def _exe_path(cls):
        ext = "" if os.name == "posix" else ".exe"

        return os.path.abspath(
            os.path.join(
                os.path.dirname(__file__),
                "..", "..", "..", "mage", f"mailleurTT{ext}"
            )
        )

    ###########
    # Meshing #
    ###########

    def meshing(self, reach, step: float = 50):
        if reach is None or len(reach.profiles) == 0:
            return reach

        with tempfile.TemporaryDirectory() as tmp:
            logger.debug(f"temp file: {tmp}")
            st_file = self.export_reach_to_st(reach, tmp)
            m_file = st_file.rsplit(".ST", 1)[0] + ".M"

            proc = QProcess()
            proc.setWorkingDirectory(tmp)

            logger.info(f"! mailleurTT {st_file} {m_file} {str(step)}")
            proc.start(
                self._exe_path(), [st_file, m_file, str(step)]
            )
            proc.waitForFinished()

            errors = proc.readAllStandardError()
            if len(errors) != 0:
                logger.error(
                    f"{logger_color_red()}{errors}{logger_color_reset()}"
                )
            else:
                self.import_m_file(reach, m_file)
            return reach

    def export_reach_to_st(self, reach, tmp):
        tmp_st = os.path.join(tmp, "meshing.ST")

        logger.debug(f"meshing: Export ST to {tmp_st}")

        reach.export_reach(tmp_st)
        return tmp_st

    def import_m_file(self, reach, m):
        reach.purge()

        logger.debug(f"meshing: Import geometry from {m}")
        reach.import_geometry(m)