BoundaryCondition.py 9.33 KB
Newer Older
# BoundaryCondition.py -- Pamhyr
# Copyright (C) 2023  INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

from tools import trace, timer, old_pamhyr_date_to_timestamp
from Model.DB import SQLSubModel
from Model.Except import NotImplementedMethodeError

logger = logging.getLogger()

class BoundaryCondition(SQLSubModel):
    _sub_classes = []
    _id_cnt = 0

    def __init__(self, id:int = -1, name:str = "",
                 status = None):
        super(BoundaryCondition, self).__init__()

        self._status = status

            self.id = BoundaryCondition._id_cnt
        self._name = name
        self._type = ""
        self._node = None
        BoundaryCondition._id_cnt = max(BoundaryCondition._id_cnt + 1, self.id)

    @classmethod
    def _sql_create(cls, execute):
        execute("""
          CREATE TABLE boundary_condition(
            id INTEGER NOT NULL PRIMARY KEY,
            name TEXT NOT NULL,
            type TEXT NOT NULL,
            tab TEXT NOT NULL,
            node INTEGER,
            FOREIGN KEY(node) REFERENCES river_node(id)
          )
        """)

        execute("""
          CREATE TABLE boundary_condition_data(
            id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
            ind INTEGER NOT NULL,
            data0 TEXT NOT NULL,
            data1 TEXT NOT NULL,
            bc INTEGER,
            FOREIGN KEY(bc) REFERENCES boundary_condition(id)
          )
        """)

        return cls._create_submodel(execute)

    @classmethod
    def _sql_update(cls, execute, version):
        return True

    @classmethod
    def _get_ctor_from_type(cls, t):
        from Model.BoundaryCondition.BoundaryConditionTypes import (
            NotDefined, PonctualContribution,
            TimeOverZ, TimeOverDischarge, ZOverDischarge,
            Solid,
        )

        res = NotDefined
        if t == "PC":
            res = PonctualContribution
        elif t == "TZ":
            res = TimeOverZ
        elif t == "TD":
            res = TimeOverDischarge
        elif t == "ZD":
            res = ZOverDischarge
        return res

    @classmethod
    def _sql_load(cls, execute, data = None):
        new = []
        tab = data["tab"]

        table = execute(
            "SELECT id, name, type, node " +
            "FROM boundary_condition " +
            f"WHERE tab = '{tab}'"
        )

        for row in table:
            t = row[2]
            ctor = cls._get_ctor_from_type(t)
            bc = ctor(
                id = row[0],
                name = row[1],
                status = data['status']
            )

            bc.node = None
            if row[3] != -1:
                bc.node = next(filter(lambda n: n.id == row[3], data["nodes"]))

            values = execute(
                "SELECT ind, data0, data1 FROM boundary_condition_data " +
                f"WHERE bc = '{bc.id}'"
            )
            # Create dummy data list
            for _ in values:
                bc.add(0)
            # Write data
            for v in values:
                ind = v[0]
                data0 = bc._types[0](v[1])
                data1 = bc._types[1](v[2])
                # Replace data at pos ind
                bc._data[ind] = (data0, data1)

            new.append(bc)

        return new

    def _sql_save(self, execute, data = None):
        tab = data["tab"]

        execute(f"DELETE FROM boundary_condition WHERE id = {self.id}")
        execute(f"DELETE FROM boundary_condition_data WHERE bc = {self.id}")

        node = -1
        if self._node is not None:
            node = self._node.id

        sql = (
            "INSERT INTO " +
            "boundary_condition(id, name, type, tab, node) "+
            "VALUES (" +
            f"{self.id}, '{self._sql_format(self._name)}', " +
            f"'{self._sql_format(self._type)}', '{tab}', {node}" +
            ")"
        )
        execute(sql)

        ind = 0
        for d in self._data:
            data0 = self._sql_format(str(d[0]))
            data1 = self._sql_format(str(d[1]))

            sql = (
                "INSERT INTO " +
                "boundary_condition_data(ind, data0, data1, bc) "+
                f"VALUES ({ind}, '{data0}', {data1}, {self.id})"
            )
            execute(sql)
            ind += 1

        return True

    def __len__(self):
        return len(self._data)

    @classmethod
    def compatibility(cls):
        return ["liquid", "solid", "suspenssion"]

    @classmethod
    def time_convert(cls, data):
        if type(data) == str and data.count(":") == 3:
            return old_pamhyr_date_to_timestamp(data)

        return int(data)

    @property
    def name(self):
        return self._name

    @name.setter
    def name(self, name):
        self._name = name
        self._status.modified()

    @property
    def bctype(self):
        return self._type

    @property
    def node(self):
        return self._node

    @node.setter
    def node(self, node):
        self._node = node
        self._status.modified()

    def has_node(self):
        return self._node is not None

    @property
    def header(self):
        return self._header.copy()

    @property
    def data(self):
        return self._data.copy()

    def get_type_column(self, column):
        if 0 <= column < 2:
            return self._types[column]
        return None

    @property
    def _default_0(self):
        return self._types[0](0)

    def _default_1(self):
        return self._types[1](0.0)

    def is_define(self):
        return self._data is not None
    def new_from_data(self, header, data):
        new_0 = self._default_0
        new_1 = self._default_1

        if len(header) != 0:
            for i in [0,1]:
                for j in range(len(header)):
                    if self._header[i] == header[j]:
                        if i == 0:
                            new_0 = self._types[i](data[j].replace(",", "."))
                            new_1 = self._types[i](data[j].replace(",", "."))
            new_0 = self._types[0](data[0].replace(",", "."))
            new_1 = self._types[1](data[1].replace(",", "."))

        return (new_0, new_1)

    def add(self, index:int):
        value = (self._default_0, self._default_1)
        self._data.insert(index, value)
        self._status.modified()
        return value

    def insert(self, index:int, value):
        self._data.insert(index, value)
        self._status.modified()
    def delete_i(self, indexes):
        self._data = list(
            map(
                lambda e: e[1],
                filter(
                    lambda e: e[0] not in indexes,
                    enumerate(self.data)
                )
            )
        )
        self._status.modified()
    def delete(self, els):
        self._data = list(
            filter(
                lambda e: e not in els,
                self.data
            )
        )
        self._status.modified()

    def sort(self, _reverse=False, key=None):
        if key is None:
            self._data.sort(reverse=_reverse)
        else:
            self._data.sort(reverse=_reverse, key=key)
        self._status.modified()
    def get_i(self, index):
        return self.data[index]

    def get_range(self, _range):
        l = []
        for r in _range:
            l.append(r)
        return l


    def _set_i_c_v(self, index, column, value):
        v = list(self._data[index])
        v[column] = self._types[column](value)
        self._data[index] = tuple(v)
        self._status.modified()

    def set_i_0(self, index:int, value):
        self._set_i_c_v(index, 0, value)

    def set_i_1(self, index:int, value):
        self._set_i_c_v(index, 1, value)
    def convert(self, cls):
        new = cls(name = self.name, status = self._status)
        new.node = self.node

        for i, _ in enumerate(self.data):
            new.add(i)

        for i in [0,1]:
            for j in [0,1]:
                if self._header[i] == new.header[j]:
                    for ind, v in self.data:
                        try:
                            new._set_i_c_v(ind, j, v[i])
                        except Exception as e:
                            logger.info(e)

    def move_up(self, index):
        if index < len(self):
            next = index - 1
            d = self._data
            d[index], d[next] = d[next], d[index]
            self._status.modified()

    def move_down(self, index):
        if index >= 0:
            prev = index + 1
            d = self._data
            d[index], d[prev] = d[prev], d[index]
            self._status.modified()