Forked from HYCAR-Hydro / airGR
Source project has a limited visibility.
LateralContribution.py 10.76 KiB
# LateralContribution.py -- Pamhyr
# Copyright (C) 2023  INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

# -*- coding: utf-8 -*-

import logging

from tools import trace, timer, old_pamhyr_date_to_timestamp

from Model.Tools.PamhyrDB import SQLSubModel
from Model.Except import NotImplementedMethodeError

logger = logging.getLogger()


class LateralContribution(SQLSubModel):
    _sub_classes = []
    _id_cnt = 0

    def __init__(self, id: int = -1, name: str = "", status=None):
        super(LateralContribution, self).__init__()

        self._status = status

        if id == -1:
            self.id = LateralContribution._id_cnt
        else:
            self.id = id

        self._name = name
        self._type = ""
        self._edge = None
        self._begin_kp = 0.0
        self._end_kp = 0.0
        self._data = []
        self._header = []
        self._types = [float, float]

        LateralContribution._id_cnt = max(
            LateralContribution._id_cnt + 1, self.id)

    @classmethod
    def _db_create(cls, execute):
        execute("""
          CREATE TABLE lateral_contribution(
            id INTEGER NOT NULL PRIMARY KEY,
            name TEXT NOT NULL,
            type TEXT NOT NULL,
            tab TEXT NOT NULL,
            edge INTEGER,
            begin_kp REAL NOT NULL,
            end_kp REAL NOT NULL,
            FOREIGN KEY(edge) REFERENCES river_reach(id)
          )
        """)

        execute("""
          CREATE TABLE lateral_contribution_data(
            id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
            ind INTEGER NOT NULL,
            data0 TEXT NOT NULL,
            data1 TEXT NOT NULL,
            lc INTEGER,
            FOREIGN KEY(lc) REFERENCES lateral_contribution(id)
          )
        """)

        return cls._create_submodel(execute)

    @classmethod
    def _db_update(cls, execute, version):
        return True

    @classmethod
    def _get_ctor_from_type(cls, t):
        from Model.LateralContribution.LateralContributionTypes import (
            NotDefined, LateralContrib, Rain, Evaporation
        )

        res = NotDefined
        if t == "LC":
            res = LateralContrib
        elif t == "RA":
            res = Rain
        elif t == "EV":
            res = Evaporation
        return res

    @classmethod
    def _db_load(cls, execute, data=None):
        new = []
        tab = data["tab"]

        table = execute(
            "SELECT id, name, type, edge, begin_kp, end_kp " +
            f"FROM lateral_contribution WHERE tab = '{tab}'"
        )

        for row in table:
            t = row[2]
            ctor = cls._get_ctor_from_type(t)
            lc = ctor(
                id=row[0],
                name=row[1],
                status=data['status']
            )
            lc.edge = None
            if row[3] != -1:
                lc.edge = next(filter(lambda e: e.id == row[3], data["edges"]))
            lc._begin_kp = row[4]
            lc._end_kp = row[5]

            values = execute(
                "SELECT ind, data0, data1 FROM lateral_contribution_data " +
                f"WHERE lc = '{lc.id}'"
            )
            # Create dummy data list
            for _ in values:
                lc.add(0)
            # Write data
            for v in values:
                ind = v[0]
                data0 = lc._types[0](v[1])
                data1 = lc._types[1](v[2])
                # Replace data at pos ind
                lc._data[ind] = (data0, data1)

            new.append(lc)

        return new

    def _db_save(self, execute, data=None):
        tab = data["tab"]

        execute(f"DELETE FROM lateral_contribution WHERE id = {self.id}")
        execute(f"DELETE FROM lateral_contribution_data WHERE lc = {self.id}")

        edge = -1
        if self._edge is not None:
            edge = self._edge.id

        sql = (
            "INSERT INTO " +
            "lateral_contribution(id, name, type, tab, " +
            "edge, begin_kp, end_kp) " +
            "VALUES (" +
            f"{self.id}, '{self._db_format(self._name)}', " +
            f"'{self._db_format(self._type)}', '{tab}', {edge}, " +
            f"{self._begin_kp}, {self._end_kp}" +
            ")"
        )
        execute(sql)

        ind = 0
        for d in self._data:
            data0 = self._db_format(str(d[0]))
            data1 = self._db_format(str(d[1]))

            sql = (
                "INSERT INTO " +
                "lateral_contribution_data(ind, data0, data1, lc) " +
                f"VALUES ({ind}, '{data0}', {data1}, {self.id})"
            )
            execute(sql)
            ind += 1

        return True

    def __len__(self):
        return len(self._data)

    @classmethod
    def compatibility(cls):
        return ["liquid", "solid", "suspenssion"]

    @classmethod
    def time_convert(cls, data):
        if type(data) is str:
            if data.count(":") == 3:
                return old_pamhyr_date_to_timestamp(data)
            if data.count(":") == 2:
                return old_pamhyr_date_to_timestamp("00:" + data)
            if data.count(".") == 1:
                return round(float(data))

        return int(data)

    @property
    def name(self):
        if self._name == "":
            return f"L{self.id + 1}"

        return self._name

    @name.setter
    def name(self, name):
        self._name = name
        self._status.modified()

    @property
    def lctype(self):
        return self._type

    @property
    def edge(self):
        return self._edge

    @edge.setter
    def edge(self, edge):
        self._edge = edge
        if edge is not None:
            self._begin_kp = self._edge.reach.get_kp_min()
            self._end_kp = self._edge.reach.get_kp_max()
        self._status.modified()

    def has_edge(self):
        return self._edge is not None

    @property
    def begin_kp(self):
        return self._begin_kp

    @begin_kp.setter
    def begin_kp(self, begin_kp):
        if self._edge is None:
            self._begin_kp = begin_kp
        else:
            _min = self._edge.reach.get_kp_min()
            _max = self._edge.reach.get_kp_max()

            if _min <= begin_kp <= _max:
                self._begin_kp = begin_kp

        self._status.modified()

    @property
    def end_kp(self):
        return self._end_kp

    @end_kp.setter
    def end_kp(self, end_kp):
        if self._edge is None:
            self._end_kp = end_kp
        else:
            _min = self._edge.reach.get_kp_min()
            _max = self._edge.reach.get_kp_max()

            if _min <= end_kp <= _max:
                self._end_kp = end_kp

        self._status.modified()

    @property
    def header(self):
        return self._header.copy()

    @property
    def data(self):
        return self._data.copy()

    def get_type_column(self, column):
        if 0 <= column < 2:
            return self._types[column]
        return None

    @property
    def _default_0(self):
        return self._types[0](0)

    @property
    def _default_1(self):
        return self._types[1](0.0)

    def is_define(self):
        return self._data is not None

    def new_from_data(self, header, data):
        new_0 = self._default_0
        new_1 = self._default_1

        if len(header) != 0:
            for i in [0, 1]:
                for j in range(len(header)):
                    if self._header[i] == header[j]:
                        if i == 0:
                            new_0 = self._types[i](data[j].replace(",", "."))
                        else:
                            new_1 = self._types[i](data[j].replace(",", "."))
        else:
            new_0 = self._types[0](data[0].replace(",", "."))
            new_1 = self._types[1](data[1].replace(",", "."))

        return (new_0, new_1)

    def add(self, index: int):
        value = (self._default_0, self._default_1)
        self._data.insert(index, value)
        self._status.modified()
        return value

    def insert(self, index: int, value):
        self._data.insert(index, value)
        self._status.modified()

    def delete_i(self, indexes):
        self._data = list(
            map(
                lambda e: e[1],
                filter(
                    lambda e: e[0] not in indexes,
                    enumerate(self.data)
                )
            )
        )
        self._status.modified()

    def delete(self, els):
        self._data = list(
            filter(
                lambda e: e not in els,
                self.data
            )
        )
        self._status.modified()

    def sort(self, _reverse=False, key=None):
        if key is None:
            self._data.sort(reverse=_reverse)
        else:
            self._data.sort(reverse=_reverse, key=key)
        self._status.modified()

    def get_i(self, index):
        return self.data[index]

    def get_range(self, _range):
        lst = []
        for r in _range:
            lst.append(r)
        return lst

    def _set_i_c_v(self, index, column, value):
        v = list(self._data[index])
        v[column] = self._types[column](value)
        self._data[index] = tuple(v)
        self._status.modified()

    def set_i_0(self, index: int, value):
        self._set_i_c_v(index, 0, value)

    def set_i_1(self, index: int, value):
        self._set_i_c_v(index, 1, value)

    @timer
    def convert(self, cls):
        new = cls(name=self.name, status=self._status)
        new.edge = self.edge
        new.begin_kp = self.begin_kp
        new.end_kp = self.end_kp

        for i, _ in enumerate(self.data):
            new.add(i)

        for i in [0, 1]:
            for j in [0, 1]:
                if self._header[i] == new.header[j]:
                    for ind, v in self.data:
                        try:
                            new._set_i_c_v(ind, j, v[i])
                        except Exception as e:
                            logger.info(e)

        self._status.modified()
        return new

    def move_up(self, index):
        if index < len(self):
            next = index - 1
            d = self._data
            d[index], d[next] = d[next], d[index]
            self._status.modified()

    def move_down(self, index):
        if index >= 0:
            prev = index + 1
            d = self._data
            d[index], d[prev] = d[prev], d[index]
            self._status.modified()