# LateralContribution.py -- Pamhyr # Copyright (C) 2023 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # -*- coding: utf-8 -*- import logging from tools import trace, timer, old_pamhyr_date_to_timestamp from Model.Tools.PamhyrDB import SQLSubModel from Model.Except import NotImplementedMethodeError logger = logging.getLogger() class LateralContribution(SQLSubModel): _sub_classes = [] _id_cnt = 0 def __init__(self, id: int = -1, name: str = "", status=None): super(LateralContribution, self).__init__() self._status = status if id == -1: self.id = LateralContribution._id_cnt else: self.id = id self._name = name self._type = "" self._edge = None self._begin_kp = 0.0 self._end_kp = 0.0 self._data = [] self._header = [] self._types = [float, float] LateralContribution._id_cnt = max( LateralContribution._id_cnt + 1, self.id) @classmethod def _sql_create(cls, execute): execute(""" CREATE TABLE lateral_contribution( id INTEGER NOT NULL PRIMARY KEY, name TEXT NOT NULL, type TEXT NOT NULL, tab TEXT NOT NULL, edge INTEGER, begin_kp REAL NOT NULL, end_kp REAL NOT NULL, FOREIGN KEY(edge) REFERENCES river_reach(id) ) """) execute(""" CREATE TABLE lateral_contribution_data( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, ind INTEGER NOT NULL, data0 TEXT NOT NULL, data1 TEXT NOT NULL, lc INTEGER, FOREIGN KEY(lc) REFERENCES lateral_contribution(id) ) """) return cls._create_submodel(execute) @classmethod def _sql_update(cls, execute, version): return True @classmethod def _get_ctor_from_type(cls, t): from Model.LateralContribution.LateralContributionTypes import ( NotDefined, LateralContrib, Rain, Evaporation ) res = NotDefined if t == "LC": res = LateralContrib elif t == "RA": res = Rain elif t == "EV": res = Evaporation return res @classmethod def _sql_load(cls, execute, data=None): new = [] tab = data["tab"] table = execute( "SELECT id, name, type, edge, begin_kp, end_kp " + f"FROM lateral_contribution WHERE tab = '{tab}'" ) for row in table: t = row[2] ctor = cls._get_ctor_from_type(t) lc = ctor( id=row[0], name=row[1], status=data['status'] ) lc.edge = None if row[3] != -1: lc.edge = next(filter(lambda e: e.id == row[3], data["edges"])) lc._begin_kp = row[4] lc._end_kp = row[5] values = execute( "SELECT ind, data0, data1 FROM lateral_contribution_data " + f"WHERE lc = '{lc.id}'" ) # Create dummy data list for _ in values: lc.add(0) # Write data for v in values: ind = v[0] data0 = lc._types[0](v[1]) data1 = lc._types[1](v[2]) # Replace data at pos ind lc._data[ind] = (data0, data1) new.append(lc) return new def _sql_save(self, execute, data=None): tab = data["tab"] execute(f"DELETE FROM lateral_contribution WHERE id = {self.id}") execute(f"DELETE FROM lateral_contribution_data WHERE lc = {self.id}") edge = -1 if self._edge is not None: edge = self._edge.id sql = ( "INSERT INTO " + "lateral_contribution(id, name, type, tab, edge, begin_kp, end_kp) " + "VALUES (" + f"{self.id}, '{self._sql_format(self._name)}', " + f"'{self._sql_format(self._type)}', '{tab}', {edge}, " + f"{self._begin_kp}, {self._end_kp}" + ")" ) execute(sql) ind = 0 for d in self._data: data0 = self._sql_format(str(d[0])) data1 = self._sql_format(str(d[1])) sql = ( "INSERT INTO " + "lateral_contribution_data(ind, data0, data1, lc) " + f"VALUES ({ind}, '{data0}', {data1}, {self.id})" ) execute(sql) ind += 1 return True def __len__(self): return len(self._data) @classmethod def compatibility(cls): return ["liquid", "solid", "suspenssion"] @classmethod def time_convert(cls, data): if type(data) == str and data.count(":") == 3: return old_pamhyr_date_to_timestamp(data) return int(data) @property def name(self): return self._name @name.setter def name(self, name): self._name = name self._status.modified() @property def lctype(self): return self._type @property def edge(self): return self._edge @edge.setter def edge(self, edge): self._edge = edge if edge is not None: self._begin_kp = self._edge.reach.get_kp_min() self._end_kp = self._edge.reach.get_kp_max() self._status.modified() def has_edge(self): return self._edge is not None @property def begin_kp(self): return self._begin_kp @begin_kp.setter def begin_kp(self, begin_kp): if self._edge is None: self._begin_kp = begin_kp else: _min = self._edge.reach.get_kp_min() _max = self._edge.reach.get_kp_max() if _min <= begin_kp <= _max: self._begin_kp = begin_kp self._status.modified() @property def end_kp(self): return self._end_kp @end_kp.setter def end_kp(self, end_kp): if self._edge is None: self._end_kp = end_kp else: _min = self._edge.reach.get_kp_min() _max = self._edge.reach.get_kp_max() if _min <= end_kp <= _max: self._end_kp = end_kp self._status.modified() @property def header(self): return self._header.copy() @property def data(self): return self._data.copy() def get_type_column(self, column): if 0 <= column < 2: return self._types[column] return None @property def _default_0(self): return self._types[0](0) @property def _default_1(self): return self._types[1](0.0) def is_define(self): return self._data is not None def new_from_data(self, header, data): new_0 = self._default_0 new_1 = self._default_1 if len(header) != 0: for i in [0, 1]: for j in range(len(header)): if self._header[i] == header[j]: if i == 0: new_0 = self._types[i](data[j].replace(",", ".")) else: new_1 = self._types[i](data[j].replace(",", ".")) else: new_0 = self._types[0](data[0].replace(",", ".")) new_1 = self._types[1](data[1].replace(",", ".")) return (new_0, new_1) def add(self, index: int): value = (self._default_0, self._default_1) self._data.insert(index, value) self._status.modified() return value def insert(self, index: int, value): self._data.insert(index, value) self._status.modified() def delete_i(self, indexes): self._data = list( map( lambda e: e[1], filter( lambda e: e[0] not in indexes, enumerate(self.data) ) ) ) self._status.modified() def delete(self, els): self._data = list( filter( lambda e: e not in els, self.data ) ) self._status.modified() def sort(self, _reverse=False, key=None): if key is None: self._data.sort(reverse=_reverse) else: self._data.sort(reverse=_reverse, key=key) self._status.modified() def get_i(self, index): return self.data[index] def get_range(self, _range): l = [] for r in _range: l.append(r) return l def _set_i_c_v(self, index, column, value): v = list(self._data[index]) v[column] = self._types[column](value) self._data[index] = tuple(v) self._status.modified() def set_i_0(self, index: int, value): self._set_i_c_v(index, 0, value) def set_i_1(self, index: int, value): self._set_i_c_v(index, 1, value) @timer def convert(self, cls): new = cls(name=self.name, status=self._status) new.edge = self.edge new.begin_kp = self.begin_kp new.end_kp = self.end_kp for i, _ in enumerate(self.data): new.add(i) for i in [0, 1]: for j in [0, 1]: if self._header[i] == new.header[j]: for ind, v in self.data: try: new._set_i_c_v(ind, j, v[i]) except Exception as e: logger.info(e) self._status.modified() return new def move_up(self, index): if index < len(self): next = index - 1 d = self._data d[index], d[next] = d[next], d[index] self._status.modified() def move_down(self, index): if index >= 0: prev = index + 1 d = self._data d[index], d[prev] = d[prev], d[index] self._status.modified()