# Study.py -- Pamhyr Study class # Copyright (C) 2023-2024 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # -*- coding: utf-8 -*- import os import shutil import logging from datetime import datetime from tools import timer, timestamp from Model.Tools.PamhyrDB import SQLModel from Model.Saved import SavedStatus from Model.Serializable import Serializable from Model.Except import NotImplementedMethodeError from Model.River import River from Checker.Study import * logger = logging.getLogger() class Study(SQLModel): _sub_classes = [ River, ] def __init__(self, filename=None, init_new=True): # Metadata self._version = "0.0.9" self.creation_date = datetime.now() self.last_modification_date = datetime.now() self.last_save_date = datetime.now() self._filename = filename super(Study, self).__init__(filename=filename) self.status = SavedStatus() # Study general information self._name = "" self.description = "" # Time system self._time_system = "time" self._date = datetime.fromtimestamp(0) if init_new: # Study data self._river = River(status=self.status) else: self._init_db_file(filename, is_new=False) self._old_save_id = 0 @classmethod def checkers(cls): lst = [ StudyNetworkReachChecker(), StudyGeometryChecker(), # DummyOK(), # DummyWARNING(), # DummyERROR(), ] return lst @property def river(self): return self._river @property def is_saved(self): return self.status.is_saved() def save(self, progress=None): # Save a copy of database fdir, fname = os.path.split(self.filename) if self._old_save_id == 0: old_dir = os.path.join(fdir, "_PAMHYR_", "__old__") if os.name == "nt": old_dir = old_dir.replace("/", "\\") try: os.makedirs(old_dir) except FileExistsError as e: shutil.rmtree(old_dir) os.makedirs(old_dir) except Exception as e: logger.error(e) is_new = False fname = fname + "." + str(self._old_save_id) if os.path.exists(self.filename) and ".backup" not in self.filename: filename = os.path.join(fdir, "_PAMHYR_", "__old__", fname) logger.debug(f"Backup previous version copy: {filename}") shutil.copy(self.filename, filename) self._old_save_id += 1 is_new = True if ".backup" in self.filename: is_new = True self._init_db_file(self.filename, is_new=is_new) self.commit() # Save self.last_save_date = datetime.now() self._save(progress=progress) self.status.save() @property def name(self): return self._name @name.setter def name(self, name): self._name = str(name) self.status.modified() @property def filename(self): return self._filename @filename.setter def filename(self, filename): if filename is None: self._filename = None self.status.modified() return if self._filename is not None and self._filename != "": self._filename = str(filename) self.status.modified() return self._filename = str(filename) self._init_db_file(filename, is_new=True) self.status.modified() @property def time_system(self): return self._time_system def use_time(self): self._time_system = "time" self.status.modified() def use_date(self, date: datetime): self._time_system = "date" self._date = date self.status.modified() @property def date(self): return self._date @date.setter def date(self, timestamp): self._date = timestamp self.status.modified() # @classmethod # def new(cls): # return cls() @classmethod def new(cls, name, description, date=None): me = cls() me.name = name me.description = description if date is not None: me.use_date() me.date = date return me @classmethod def open(cls, filename): me = cls._load(filename) return me ####### # SQL # ####### def _db_insert_into_info(self, key, value, commit=False): self.execute( "INSERT INTO info VALUES " + f"('{key}', '{self._db_format(value)}')", commit=commit ) def _create(self): # Info (metadata) self.execute( "CREATE TABLE info(key TEXT NOT NULL UNIQUE, value TEXT NOT NULL)") self.execute( "INSERT INTO info VALUES ('version', " + f"'{self._db_format(self._version)}')", commit=True ) self.execute("INSERT INTO info VALUES ('name', '')") self.execute("INSERT INTO info VALUES ('description', '')") self.execute( f"INSERT INTO info VALUES ('time_system', '{self._time_system}')") self.execute( f"INSERT INTO info VALUES ('date', " + "'{self._date.timestamp()}')" ) self.execute( f"INSERT INTO info VALUES ('creation_date', " + "'{self.creation_time.timestamp()}')" ) self.execute( f"INSERT INTO info VALUES ('last_save_date', " + "'{self.last_save_time.timestamp()}')" ) self._create_submodel() self.commit() def _update(self): version = self.execute(f"SELECT value FROM info WHERE key='version'") logger.debug(f"{version[0]} == {self._version}") if version[0] == self._version: return True logger.debug("Update database") if self._update_submodel(version[0]): self.execute( f"UPDATE info SET value='{self._version}' WHERE key='version'" ) return True logger.info("TODO: update failed") raise NotImplementedMethodeError(self, self._update) @classmethod def _load(cls, filename): new = cls(init_new=False, filename=filename) # TODO: Load metadata new.name = new.execute("SELECT value FROM info WHERE key='name'")[0] new.description = new.execute( "SELECT value FROM info WHERE key='description'")[0] new._time_system = new.execute( "SELECT value FROM info WHERE key='time_system'")[0] new._date = datetime.fromtimestamp( float(new.execute("SELECT value FROM info WHERE key='date'")[0]) ) new.creation_date = datetime.fromtimestamp( float(new.execute( "SELECT value FROM info WHERE key='creation_date'")[0]) ) new.last_save_date = datetime.fromtimestamp( float(new.execute( "SELECT value FROM info WHERE key='last_save_date'")[0]) ) # Load river data new._river = River._db_load( lambda sql: new.execute( sql, fetch_one=False, commit=True ), data={"status": new.status} ) return new def _save(self, progress=None): progress = progress if progress is not None else lambda: None self.execute( f"UPDATE info SET " + f"value='{self._db_format(self.name)}' WHERE key='name'" ) progress() self.execute( f"UPDATE info SET " + f"value='{self._db_format(self.description)}' " + "WHERE key='description'" ) progress() self.execute( f"UPDATE info SET " + f"value='{self._time_system}' WHERE key='time_system'" ) progress() self.execute( f"UPDATE info SET " + f"value='{timestamp(self._date)}' WHERE key='date'" ) progress() self.execute( f"UPDATE info SET " + f"value='{timestamp(self.creation_date)}' " + "WHERE key='creation_date'" ) progress() self.execute( f"UPDATE info SET " + f"value='{timestamp(self.last_save_date)}' " + "WHERE key='last_save_date'" ) progress() self._save_submodel([self._river], data=progress) self.commit() def sql_save_request_count(self): return self._count() def _count(self): cnt = self._save_count([self._river]) logger.debug(cnt) return cnt + 6 def close(self): """Close db connection Returns: Nothing. """ self._close()