Newer
Older
# Mage.py -- Pamhyr
# Copyright (C) 2023 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
import logging
import numpy as np
from Solver.ASolver import AbstractSolver
from Checker.Mage import MageNetworkGraphChecker
from Model.Results.Results import Results
from Model.Results.River.River import River, Reach, Profile
logger = logging.getLogger()
def mage_file_open(filepath, mode):
f = open(filepath, mode)
if "w" in mode:
# Write header
f.write("* This file is generate by PAMHYR, please don't modify\n")
return f
class Mage(AbstractSolver):
_type = "mage"
def __init__(self, name):
super(Mage, self).__init__(name)
self._type = "mage"
self._cmd_input = ""
self._cmd_solver = "@path @input -o @output"
self._cmd_output = ""
@classmethod
def default_parameters(cls):
lst = super(Mage, cls).default_parameters()
lst += [
("mage_min_timestep", "1.0"),
("mage_timestep_tra", "3600"),
("mage_timestep_bin", "0"),
# ("mage_timestep_melissa", "0"),
("mage_continuity_discretization", "S"),
("mage_qsj_discretization", "B"),
("mage_stop_criterion_iterations", "R"),
("mage_iter_type", "0"),
("mage_smooth_coef", "0"),
("mage_cfl_max", "-1."),
("mage_min_height", "0.1"),
("mage_max_niter", "10"),
("mage_timestep_reduction_factor", "2"),
("mage_precision_reduction_factor_Z", "1"),
("mage_precision_reduction_factor_Q", "1"),
("mage_niter_max_precision", "99"),
("mage_niter_before_switch", "99"),
("mage_max_froude", "1.5"),
("mage_diffluence_node_height_balance", "-1"),
("mage_compute_reach_volume_balance", "y"),
("mage_max_reach_volume_balance", "0.001"),
("mage_min_reach_volume_to_check", "1000.0"),
]
return lst
@classmethod
def checkers(cls):
lst = [
MageNetworkGraphChecker(connectivity = True),
MageNetworkGraphChecker(connectivity = False)
]
return lst
##########
# Export #
##########
def input_param(self):
return "0.REP"
def log_file(self):
return "0.TRA"
def _export_ST(self, study, repertory, qlog):
files = []
if qlog is not None:
qlog.put("Export ST file")
# Write header
edges = study.river.edges()
edges = list(
filter(
lambda e: e.is_enable(),
edges
)
)
for edge in edges:
name = edge.name.replace(" ", "_")
if edge._name == "":
name = f"Reach_{edge.id}"
with mage_file_open(os.path.join(repertory, f"{name}.ST"), "w+") as f:
Pierre-Antoine Rouby
committed
cnt_num = 1
for profile in edge.reach.profiles:
Pierre-Antoine Rouby
committed
num = f"{cnt_num:>6}"
c1 = f"{profile.code1:>6}"
c2 = f"{profile.code2:>6}"
t = f"{len(profile.points):>6}"
kp = f"{profile.kp:>13.4f}"
Pierre-Antoine Rouby
committed
pname = profile.name
if profile.name == "":
pname = f"p{profile.id:>3}".replace(" ", "p")
name = f"{pname}"
Pierre-Antoine Rouby
committed
sediment = ""
if profile.sl is not None:
if not any(filter(lambda f: ".GRA" in f, files)):
files.append("0.GRA")
nl = len(profile.sl)
sediment = f" {nl:>3}"
for l in profile.sl.layers:
sediment += f" {l.height:>10} {l.d50:>10} {l.sigma:>10} {l.critical_constraint:>10}"
f.write(f"{num}{c1}{c2}{t}{kp} {name} {sediment}\n")
cnt_num += 1
for point in profile.points:
x = f"{point.x:>13.4f}"
y = f"{point.y:>13.4f}"
z = f"{point.z:>13.4f}"
Pierre-Antoine Rouby
committed
n = f"{point.name:<3}"
Pierre-Antoine Rouby
committed
sediment = ""
prev = point.z
if point.sl is not None:
nl = len(point.sl)
sediment = f"{nl:>3}"
for l in point.sl.layers:
prev = round(prev - l.height, 5)
sediment += f" {prev:>10} {l.d50:>10} {l.sigma:>10} {l.critical_constraint:>10}"
f.write(f"{x}{y}{z} {n} {sediment}\n")
f.write(f" 999.9990 999.9990 999.9990\n")
def _export_BC(self, t, bounds, repertory, qlog):
if len(bounds) == 0:
return files
if qlog is not None:
qlog.put(f"Export {t} file")
with mage_file_open(os.path.join(repertory, f"0.{t}"), "w+") as f:
for bound in bounds:
name = f"{bound.node.id:3}".replace(" ", "x")
f.write(f"* {bound.node.name} ({name}) {bound.bctype}\n")
f.write(f"${name}\n")
header = bound.header
f.write(f"*{header[0]:>9}|{header[1]:>10}\n")
for d in bound.data:
f.write(f"{d[0]:10.3f}{d[1]:10.3f}\n")
def _export_bound_cond(self, study, repertory, qlog):
files = []
lst = study.river.boundary_condition
AVA = []
HYD = []
LIM = []
Pierre-Antoine Rouby
committed
QSO = []
for tab in ["liquid", "solid", "suspenssion"]:
for bound in lst.get_tab(tab):
if bound.bctype == "ZD":
AVA.append(bound)
elif bound.bctype == "TD" or bound.bctype == "PC":
HYD.append(bound)
elif bound.bctype == "TZ":
LIM.append(bound)
Pierre-Antoine Rouby
committed
elif bound.bctype == "SL":
QSO.append(bound)
files = files + self._export_BC("AVA", AVA, repertory, qlog)
files = files + self._export_BC("HYD", HYD, repertory, qlog)
files = files + self._export_BC("LIM", LIM, repertory, qlog)
Pierre-Antoine Rouby
committed
files = files + self._export_QSO(QSO, repertory, qlog)
# @timer
# def _export_LC(self, lateral, repertory, qlog):
# files = []
# if qlog is not None:
# qlog.put(f"Export LAT file")
# with mage_file_open(os.path.join(repertory, f"0.LAT"), "w+") as f:
# files.append(f"0.LAT")
# name = f"{lateral.node.id:3}".replace(" ", "x")
# f.write(f"* {lateral.node.name} ({name}) {lateral.bctype}\n")
# f.write(f"${name}\n")
# header = lateral.header
# f.write(f"*{header[0]:>9}|{header[1]:>10}\n")
# for d in lateral.data:
# f.write(f"{d[0]:10.3f}{d[1]:10.3f}\n")
# return files
# @timer
# def _export_lateral_contrib(self, study, repertory, qlog):
# files = []
# lst = study.river.lateral_contribution
# for tab in ["liquid", "solid", "suspenssion"]:
# for lateral in lst.get_tab(tab):
# files = files + self._export_LC(lateral, repertory, qlog)
# return files
def _export_RUG(self, study, repertory, qlog):
files = []
if qlog is not None:
qlog.put("Export RUG file")
# Write header
with mage_file_open(os.path.join(repertory, "0.RUG"), "w+") as f:
edges = list(
filter(
lambda e: e.is_enable(),
edges
)
)
id = 1
frictions = edge.frictions
for friction in frictions.frictions:
bkp = f"{friction.begin_kp:>10.3f}"
ekp = f"{friction.end_kp:>10.3f}"
# if friction.begin_kp != friction.end_kp:
strickler = friction.begin_strickler
coef_1 = f"{strickler.minor:>10.3f}"
coef_2 = f"{strickler.medium:>10.3f}"
f.write(f"K{num} {bkp}{ekp}{coef_1}{coef_2}\n")
return files
@timer
def _export_INI(self, study, repertory, qlog):
files = []
if qlog is not None:
qlog.put("Export INI file")
# Write header
with mage_file_open(os.path.join(repertory, "0.INI"), "w+") as f:
has_ini = False
reachs = study.river.edges()
reachs = list(
filter(
lambda e: e.is_enable(),
reachs
)
)
# TODO put real date...
f.write(f"$ date en minutes : 0.00\n")
f.write(f"* IB IS discharge elevation kp\n")
for reach in reachs:
cond = study.river.initial_conditions.get(reach)
if len(data) == 0:
continue
has_ini = True
id_sec = 1
for d in data:
IR = f"{id:>3}"
IS = f"{id_sec:>3}"
discharge = f"{d['discharge']:>10.5f}"
elevation = f"{d['elevation']:>11.6f}"
kp = f"{d['kp']:>9.2f}"
f.write(f" {IR} {IS} {discharge}{elevation} {kp}\n")
id_sec += 1
if has_ini:
files.append("0.INI")
def _export_REP(self, study, repertory, files, qlog):
if qlog is not None:
qlog.put("Export REP file")
# Write header
with mage_file_open(os.path.join(repertory, f"0.REP"), "w+") as f:
f.write("confirmation=non\n")
for file in files:
EXT = file.split('.')[1]
Pierre-Antoine Rouby
committed
if EXT not in ["ST", "GRA"]:
f.write(f"{EXT} {file}\n")
f.write("* OUTPUT\n")
f.write(f"TRA 0.TRA\n")
f.write(f"BIN 0.BIN\n")
Pierre-Antoine Rouby
committed
for file in files:
EXT = file.split('.')[1]
if EXT in ["GRA"]:
f.write(f"{EXT} {file}\n")
@timer
def export(self, study, repertory, qlog = None):
self._export_ST(study, repertory, qlog)
return True
###########
# RESULTS #
###########
def read_bin(self, study, repertory, results, qlog = None):
return
@timer
def results(self, study, repertory, qlog = None):
results = Results(study = study)
self.read_bin(study, repertory, results, qlog)
return results
##########
# MAGE 7 #
##########
class Mage7(Mage):
_type = "mage7"
def __init__(self, name):
super(Mage7, self).__init__(name)
self._type = "mage7"
@classmethod
def default_parameters(cls):
lst = super(Mage7, cls).default_parameters()
##########
# MAGE 8 #
##########
class Mage8(Mage):
_type = "mage8"
def __init__(self, name):
super(Mage8, self).__init__(name)
self._type = "mage8"
@classmethod
def default_parameters(cls):
lst = super(Mage8, cls).default_parameters()
# Insert new parameters at specific position
names = list(map(lambda t: t[0], lst))
i = names.index("mage_precision_reduction_factor_Q")
lst.insert(i+1, ("mage_precision_reduction_factor_r", "1"))
##########
# Export #
##########
@timer
def _export_PAR(self, study, repertory, qlog = None):
if qlog is not None:
qlog.put("Export PAR file")
with mage_file_open(os.path.join(repertory, "0.PAR"), "w+") as f:
params = study.river.get_params(self.type).parameters
for p in params:
name = p.name\
.replace("all_", "")\
.replace("mage_", "")
value = p.value
f.write(f"{name} {value}\n")
@timer
def _export_NET(self, study, repertory, qlog = None):
if qlog is not None:
qlog.put("Export NET file")
with mage_file_open(os.path.join(repertory, "0.NET"), "w+") as f:
edges = list(
filter(
lambda e: e.is_enable(),
edges
)
)
for e in edges:
name = e.name.replace(" ", "_")
if e._name == "":
name = f"Reach_{e.id}"
id = f"Bief_{e.id+1}"
n1 = f"{e.node1.id:3}".replace(" ", "x")
n2 = f"{e.node2.id:3}".replace(" ", "x")
f.write(f"{id} {n1} {n2} {file}\n")
Pierre-Antoine Rouby
committed
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
@timer
def _export_QSO(self, bounds, repertory, qlog):
files = []
if len(bounds) == 0:
return files
if qlog is not None:
qlog.put(f"Export QSO file")
with mage_file_open(os.path.join(repertory, f"0.QSO"), "w+") as f:
files.append(f"0.QSO")
for bound in bounds:
name = f"{bound.node.id:3}".replace(" ", "x")
f.write(f"* {bound.node.name} ({name}) {bound.bctype}\n")
f.write(f"${name}\n")
header = bound.header
f.write(f"*{header[0]:>9}|{header[1]:>10}\n")
for d in bound.data:
f.write(f"{d[0]:10.3f}{d[1]:10.3f}\n")
return files
@timer
def export(self, study, repertory, qlog = None):
Pierre-Antoine Rouby
committed
files = self._export_ST(study, repertory, qlog)
files = files + self._export_PAR(study, repertory, qlog)
files = files + self._export_NET(study, repertory, qlog)
files = files + self._export_bound_cond(study, repertory, qlog)
files = files + self._export_RUG(study, repertory, qlog)
files = files + self._export_INI(study, repertory, qlog)
self._export_REP(study, repertory, files, qlog)
###########
# RESULTS #
###########
def read_bin(self, study, repertory, results, qlog = None):
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
with mage_file_open(os.path.join(repertory, f"0.BIN"), "r") as f:
newline = lambda: np.fromfile(f, dtype=np.int32, count=1)
endline = lambda: np.fromfile(f, dtype=np.int32, count=1)
read_int = lambda size: np.fromfile(f, dtype=np.int32, count=size)
read_float = lambda size: np.fromfile(f, dtype=np.float32, count=size)
read_float64 = lambda size: np.fromfile(f, dtype=np.float64, count=size)
# Meta data (1st line)
newline()
data = read_int(3)
nb_reach = data[0]
nb_profile = data[1]
mage_version = data[2]
logger.debug(f"read_bin: nb_reach = {nb_reach}")
logger.debug(f"read_bin: nb_profile = {nb_profile}")
logger.debug(f"read_bin: mage_version = {mage_version}")
if mage_version <= 80:
msg = (
"Read BIN files: " +
f"Possible incompatible mage version '{mage_version}', " +
"please check your solver configuration..."
)
logger.warning(msg)
if qlog is not None:
qlog.put("[WARNING] " + msg)
results.set("solver_version", f"Mage8 ({mage_version})")
results.set("nb_reach", f"{nb_reach}")
results.set("nb_profile", f"{nb_profile}")
endline()
# Reach information (2nd line)
newline()
reachs = []
iprofiles = {}
reach_offset = {}
data = read_int(2*nb_reach)
for i in range(nb_reach):
# Add results reach to reach list
r = results.river.add(i)
reachs.append(r)
# ID of first and last reach profiles
i1 = data[2*i] - 1
i2 = data[2*i+1] - 1
# Add profile id correspondance to reach
key = (i1, i2)
iprofiles[key] = r
# Profile ID offset
reach_offset[r] = i1
logger.debug(f"read_bin: iprofiles = {iprofiles}")
endline()
# X (3rd line)
newline()
_ = read_float(nb_profile)
endline()
# Z and Y (4th line)
newline()
_ = read_float(3*nb_profile)
endline()
# Data
newline()
ip_to_r = lambda i: iprofiles[
next(
filter(
lambda k: k[0] <= i <= k[1],
iprofiles
)
)
]
ip_to_ri = lambda r, i: i - reach_offset[r]
ts = set()
end = False
while not end:
n = read_int(1)[0]
timestamp = read_float64(1)[0]
key = bytearray(np.fromfile(f, dtype=np.byte, count=1)).decode()
data = read_float(n)
logger.debug(f"read_bin: timestamp = {timestamp} sec")
ts.add(timestamp)
if key in ["Z", "Q"]:
for i, d in enumerate(data):
# Get reach corresponding to profile ID
reach = ip_to_r(i)
# Get profile id in reach
ri = ip_to_ri(reach, i)
# Set data for profile RI
reach.set(ri, timestamp, key, d)
endline()
end = newline().size <= 0
logger.info(f"read_bin: ... end with {len(ts)} timestamp read")