Newer
Older
# AdisTS.py -- Pamhyr
# Copyright (C) 2023-2024 INRAE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -*- coding: utf-8 -*-
from tools import (
trace, timer, logger_exception,
timestamp_to_old_pamhyr_date,
old_pamhyr_date_to_timestamp,
timestamp_to_old_pamhyr_date_adists
)
from Solver.CommandLine import CommandLineSolver
from Model.Results.ResultsAdisTS import Results
from Model.Results.River.River import River, Reach, Profile
def adists_file_open(filepath, mode):
f = open(filepath, mode)
if "w" in mode:
# Write header
comment = "*"
if ".ST" in filepath:
comment = "#"
f.write(
f"{comment} " +
"This file is generated by PAMHYR, please don't modify\n"
)
return f
class AdisTS(CommandLineSolver):
_type = "adists"
def __init__(self, name):
super(AdisTS, self).__init__(name)
self._type = "adists"
self._cmd_input = ""
self._cmd_solver = "@path @input -o @output"
self._cmd_output = ""
@classmethod
def default_parameters(cls):
lst = super(AdisTS, cls).default_parameters()
lst += [
("adists_implicitation_parameter", "0.5"),
("adists_timestep_screen", "60"),
("adists_timestep_bin", "60"),
("adists_timestep_csv", "60"),
("adists_timestep_mage", "60"),
("adists_initial_concentration", "60"),
]
return lst
def cmd_args(self, study):
lst = super(AdisTS, self).cmd_args(study)
return lst
def input_param(self):
name = self._study.name
return f"{name}.REP"
def log_file(self):
name = self._study.name
return f"{name}.TRA"
def _export_REP_additional_lines(self, study, rep_file):
lines = filter(
lambda line: line.is_enabled(),
study.river.rep_lines.lines
)
for line in lines:
rep_file.write(line.line)
def _export_REP(self, study, repertory, mage_rep, files, qlog, name="0"):
if qlog is not None:
qlog.put("Export REP file")
# Write header
with adists_file_open(
os.path.join(
repertory, f"{name}.REP"
), "w+"
) as f:
f.write(f"NET ../{mage_rep}/{name}.NET\n")
f.write(f"REP ../{mage_rep}/{name}.REP\n")
for file in files:
EXT = file.split('.')[1]
f.write(f"{EXT} {file}\n")
self._export_REP_additional_lines(study, f)
path_mage_net = os.path.join(os.path.abspath(
os.path.join(repertory, os.pardir)
), f"{mage_rep}/net")
path_adists_net = os.path.join(repertory, "net")
if os.path.exists(path_mage_net):
shutil.copytree(path_mage_net, path_adists_net, dirs_exist_ok=True)
@timer
def export(self, study, repertory, qlog=None):
self._study = study
name = study.name.replace(" ", "_")
self.export_additional_files(study, repertory, qlog, name=name)
return True
###########
# RESULTS #
###########
def read_bin(self, study, repertory, results, qlog=None, name="0"):
return
@timer
def results(self, study, repertory, qlog=None, name="0", type_pol=None):
results = Results(
study=study,
solver=self,
repertory=repertory,
name=name,
)
self.read_bin(study, repertory, results, qlog, name=name)
return results
def output_param(self):
name = ""
return f"{name}"
_alph = list(
map(
chr,
chain(
range(48, 58), # 0..9
range(65, 91), # A..Z
range(97, 123) # a..z
)
)
)
_l_alph = len(_alph)
_nodes_cnt = 0
_nodes_names = {}
_nodes_views = set()
def get_reach_name(self, reach):
return f"Reach_{reach.id:>3}".replace(" ", "0")
def get_node_name(self, node):
"""Generate a 3 char name for node
Args:
node: The node
Returns:
A 3 char name string
"""
n = node.id
# if n in self._nodes_names:
# return self._nodes_names[n]
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
name = ""
checked_new = False
while not checked_new:
self._nodes_cnt += 1
nc = self._nodes_cnt
name = "".join(
map(
lambda i: self._alph[i % self._l_alph],
[
int(nc / (self._l_alph * self._l_alph)),
int(nc / self._l_alph),
nc
]
)
)
checked_new = name not in self._nodes_views
self._nodes_views.add(name)
self._nodes_names[n] = name
return name
#################################
# Adis-TS in weak coupling mode #
#################################
# Insert new parameters at specific position
names = list(map(lambda t: t[0], lst))
return lst
##########
# Export #
##########
def cmd_args(self, study):
def _export_POLs(self, study, repertory, qlog=None, name="0"):
files = []
if qlog is not None:
qlog.put("Export POLS files")
pollutants = study.river.Pollutants.Pollutants_List
for pollutant in pollutants:
name = pollutant.name
with adists_file_open(
os.path.join(repertory, f"{name}.POL"), "w+"
) as f:
files.append(f"{name}.POL")
f.write(f"name = {name}\n")
self._export_POL_Characteristics(
study, pollutant._data, f, qlog
)
POL_ICs = next(filter(
lambda ic: ic.pollutant == pollutant.id,
study.river.ic_adists.Initial_Conditions_List
))
self._export_ICs_AdisTS(
study, repertory, POL_ICs, qlog, name
)
POL_BCs = list(filter(
lambda bc: bc.pollutant == pollutant.id,
study.river.boundary_conditions_adists.BCs_AdisTS_List
))
if len(POL_BCs) != 0:
f.write(f"file_cl = {name}.CDT\n")
self._export_BCs_AdisTS(
study, repertory, POL_BCs, qlog, name
)
POL_LAT_Cont = list(filter(
lambda lc: lc.pollutant == pollutant.id,
study.river.lateral_contributions_adists.Lat_Cont_List
))
if len(POL_LAT_Cont) != 0:
f.write(f"file_ald = {name}.ALD\n")
f.write(f"*\n")
self._export_Lat_AdisTS(
study, repertory, POL_LAT_Cont, qlog, name
)
def _export_Lat_AdisTS(self, study, repertory, POL_LC, qlog, POL_name):
if qlog is not None:
qlog.put("Export POL LCs files")
with adists_file_open(
os.path.join(repertory, f"{POL_name}.ALD"), "w+"
) as f:
reach = next(filter(
lambda edge: edge.id == LC.edge, study.river.edges()
)) # .name
reach_name = self.get_reach_name(self, reach)
f.write(f"*temps |débit massique (kg/s)\n")
f.write(f"*---------++++++++++\n")
for LC_data in LC._data:
tmp = timestamp_to_old_pamhyr_date_adists(int(LC_data[0]))
f.write(" ".join((f"{tmp}",
f"{LC_data[1]}\n")))
f.write(f"*\n")
return True
def _export_BCs_AdisTS(self, study, repertory, POL_BC, qlog, POL_name):
if qlog is not None:
qlog.put("Export POL BCs files")
with adists_file_open(os.path.join(
repertory, f"{POL_name}.CDT"), "w+"
) as f:
node = next(filter(
lambda x: x.id == BC.node, study.river._nodes
)) # .name
print("node name in BC:",
node, node.name,
self.get_node_name(node))
node_name = node.name # self.get_node_name(node)
f.write(f"${node_name}\n")
if BC.type == "Concentration":
f.write(f"*temps |concentration\n")
f.write(f"*JJ:HH:MM | (g/L)\n")
f.write(f"*---------++++++++++\n")
else:
f.write(f"*temps |rate\n")
f.write(f"*JJ:HH:MM | (kg/s)\n")
f.write(f"*---------++++++++++\n")
for BC_data in BC._data:
tmp = timestamp_to_old_pamhyr_date_adists(int(BC_data[0]))
f.write(" ".join((f"{tmp}",
f"{BC_data[1]}\n")))
def _export_ICs_AdisTS(self, study, repertory,
POL_IC_default, qlog, POL_name):
if qlog is not None:
qlog.put("Export POL ICs files")
with adists_file_open(os.path.join(
repertory, f"{POL_name}.INI"
), "w+") as f:
f.write(" ".join(("DEFAULT =",
f"{POL_IC_default.concentration}",
f"{POL_IC_default.eg}",
f"{POL_IC_default.em}",
f"{POL_IC_default.ed}\n")))
self._export_ICs_AdisTS_Spec(
study, POL_IC_default._data, f, qlog
)
def _export_ICs_AdisTS_Spec(self, study, pol_ics_spec_data,
f, qlog, name="0"):
f.write(" ".join((f"{ic_spec.name}",
"=",
f"{ic_spec.reach}",
f"{ic_spec.start_rk}",
f"{ic_spec.end_rk}",
f"{ic_spec.concentration}",
f"{ic_spec.eg}",
f"{ic_spec.em}",
f"{ic_spec.ed}",
f"{ic_spec.rate}\n")))
def _export_POL_Characteristics(self, study, pol_data, f, qlog, name="0"):
list_characteristics = ["type", "diametre", "rho", "porosity",
"cdc_riv", "cdc_cas", "apd", "ac", "bc"]
for i in range(len(list_characteristics)):
f.write(f"{list_characteristics[i]} = {pol_data[0][i]}\n")
def _export_D90(self, study, repertory, qlog=None, name="0"):
files = []
if qlog is not None:
qlog.put("Export D90 file")
with adists_file_open(
os.path.join(repertory, f"{name}.D90"), "w+"
) as f:
files.append(f"{name}.D90")
f.write(f"*Diamètres caractéristiques du fond stable\n")
d90AdisTS = study.river.d90_adists.D90_AdisTS_List
self._export_d90_spec(study, d90AdisTS[0]._data, f, qlog)
return files
def _export_d90_spec(self, study, d90_spec_data, f, qlog, name="0"):
for d90_spec in d90_spec_data:
if (d90_spec.name is None
or d90_spec.reach is None
or d90_spec.start_rk is None
or d90_spec.end_rk is None
or d90_spec.d90 is None):
return
edges = study.river.enable_edges()
id_edges = list(map(lambda x: x.id, edges))
id_reach = d90_spec.reach
if id_reach not in id_edges:
return
f.write(" ".join((f"{d90_spec.name}",
"=",
f"{id_reach}",
f"{d90_spec.start_rk}",
f"{d90_spec.end_rk}",
f"{d90_spec.d90}\n")))
def _export_DIF(self, study, repertory, qlog=None, name="0"):
files = []
if qlog is not None:
qlog.put("Export DIF file")
with adists_file_open(
os.path.join(repertory, f"{name}.DIF"), "w+"
) as f:
files.append(f"{name}.DIF")
f.write(f"*Définition des paramètres des fonctions de calcul du\n")
f.write(f"*coefficient de diffusion\n")
difAdisTS = study.river.dif_adists.DIF_AdisTS_List
if difAdisTS[0].method != "generique":
f.write(" ".join((f"defaut = ",
f"{difAdisTS[0].method}",
f"{difAdisTS[0].dif}\n")))
f.write(" ".join((f"defaut ="
f"{difAdisTS[0].method}",
f"{difAdisTS[0].dif}",
f"{difAdisTS[0].b}",
f"{difAdisTS[0].c}\n")))
self._export_dif_spec(study, difAdisTS[0]._data, f, qlog)
return files
def _export_dif_spec(self, study, dif_spec_data, f, qlog, name="0"):
for dif_spec in dif_spec_data:
if (dif_spec.reach is None
or dif_spec.start_rk is None
or dif_spec.end_rk is None
or dif_spec.dif is None
or dif_spec.b is None
or dif_spec.c is None):
return
edges = study.river.enable_edges()
id_edges = list(map(lambda x: x.id, edges))
id_reach = dif_spec.reach
if id_reach not in id_edges:
return
if dif_spec.method != "generique":
f.write(" ".join((f"{dif_spec.method}",
"=",
f"{id_reach}",
f"{dif_spec.start_rk}",
f"{dif_spec.end_rk}",
f"{dif_spec.dif}\n")))
f.write(" ".join((f"{dif_spec.method}",
f"=" f"{id_reach}",
f"{dif_spec.start_rk}",
f"{dif_spec.end_rk}",
f"{dif_spec.dif}",
f"{dif_spec.b}",
f"{dif_spec.c}\n")))
def _export_NUM(self, study, repertory, qlog=None, name="0"):
dict_names = {"init_time": "start_date",
"final_time": "end_date",
"timestep": "dt0",
"implicitation_parameter": "theta",
"timestep_screen": "dtscr",
"timestep_bin": "dtbin",
"timestep_csv": "dtcsv",
"timestep_mage": "dtMage",
"initial_concentration": "c_initiale"}
files = []
if qlog is not None:
qlog.put("Export NUM file")
with adists_file_open(
os.path.join(repertory, f"{name}.NUM"), "w+"
) as f:
files.append(f"{name}.NUM")
params = study.river.get_params(self.type).parameters
for p in params:
name = p.name\
.replace("all_", "")\
.replace("adists_", "")
value = p.value
logger.debug(
f"export: NUM: {name}: {value} ({p.value})"
)
if name != "command_line_arguments":
f.write(f"{dict_names[name]} = {value}\n")
for outputrk in outputrks:
self._export_outputrk(study, outputrk, f, qlog)
def _export_outputrk(self, study, outputrk, f, qlog, name="0"):
if (outputrk.reach is None
or outputrk.rk is None
or outputrk.title is None):
id_reach = outputrk.reach
rk = outputrk.rk
title = outputrk.title
@timer
def read_bin(self, study, repertory, results, qlog=None, name="0"):
repertory_results = os.path.join(repertory, "resultats")
files_bin_names = [el.split("/")[-1]
for el in glob.glob(repertory_results+"/*.bin")]
print("files names resultats: ", files_bin_names)
ifilename = os.path.join(repertory_results, files_bin_names[0])
logger.info(f"read_bin: Start reading '{ifilename}' ...")
print("reading ", ifilename)
with open(ifilename, 'rb') as f:
# header
# first line
data = np.fromfile(f, dtype=np.int32, count=1) # (start)
data = np.fromfile(f, dtype=np.int32, count=3)
ibmax = data[0] # number of reaches
ismax = data[1] # total number of cross sections
kbl = data[2] * -1 # block size for .BIN header
data = np.fromfile(f, dtype=np.int32, count=1) # (end)
data = np.fromfile(f, dtype=np.int32, count=1) # (start)
ibu = np.fromfile(f, dtype=np.int32, count=ibmax)
data = np.fromfile(f, dtype=np.int32, count=1) # (end)
data = np.fromfile(f, dtype=np.int32, count=1) # (start)
data = np.fromfile(f, dtype=np.int32, count=2 * ibmax)
is1 = np.zeros(ibmax, dtype=np.int32)
is2 = np.zeros(ibmax, dtype=np.int32)
print("nombre de biefs : ", ibmax)
logger.debug(f"read_bin: nb_reach = {ibmax}")
logger.debug(f"read_bin: nb_profile = {ismax}")
results.set("nb_reach", f"{ibmax}")
results.set("nb_profile", f"{ismax}")
reachs = []
iprofiles = {}
reach_offset = {}
for i in range(ibmax):
# Add results reach to reach list
r = results.river.add(i)
reachs.append(r)
is1[i] = data[2 * i] - 1 # first section of reach i
is2[i] = data[2 * i + 1] - 1 # last section of reach i
key = (is1[i], is2[i])
iprofiles[key] = r
reach_offset[r] = is1[i]
logger.debug(f"read_bin: iprofiles = {iprofiles}")
data = np.fromfile(f, dtype=np.int32, count=1) # (end)
# fourth line
pk = np.zeros(ismax, dtype=np.float32)
for k in range(0, ismax, kbl):
data = np.fromfile(f, dtype=np.int32, count=1) # (start)
pk[k:min(k + kbl, ismax)] = np.fromfile(f,
dtype=np.float32,
count=min(
k + kbl, ismax
) - k)
data = np.fromfile(f, dtype=np.int32, count=1) # (end)
data = np.fromfile(f, dtype=np.int32, count=1) # (start)
zmin_OLD = np.fromfile(f, dtype=np.float32, count=1)[0]
data = np.fromfile(f, dtype=np.int32, count=1) # (end)
# sixth line
zf = np.zeros(ismax, dtype=np.float32)
z = np.zeros(ismax * 3, dtype=np.float32)
for k in range(0, ismax, kbl):
data = np.fromfile(f, dtype=np.int32, count=1) # (start)
z[3 * k:3 * min(k + kbl, ismax)] = \
np.fromfile(f,
dtype=np.float32,
count=3 * (min(k + kbl, ismax) - k)
)
data = np.fromfile(f, dtype=np.int32, count=1) # (end)
zf = [z[i * 3] for i in range(ismax)]
# seventh line (useless)
for k in range(0, ismax, kbl):
data = np.fromfile(f, dtype=np.int32, count=1) # (start)
zero = np.fromfile(f, dtype=np.int32, count=ismax)
data = np.fromfile(f, dtype=np.int32, count=1) # (end)
def ip_to_r(i):
return iprofiles[
next(
filter(
lambda k: k[0] <= i <= k[1],
iprofiles
)
def ip_to_ri(r, i): return i - reach_offset[r]
path_files = map(lambda file: os.path.join(
repertory_results, file), files_bin_names)
data_tmp = {}
for file_bin in path_files:
key_pol = file_bin.split("/")[-1][0:-4]
data_tmp[key_pol] = {}
with open(file_bin, 'rb') as f:
# header
# first line
data = np.fromfile(f, dtype=np.int32, count=1) # (start)
data = np.fromfile(f, dtype=np.int32, count=3)
ibmax = data[0] # number of reaches
ismax = data[1] # total number of cross sections
kbl = data[2] * -1 # block size for .BIN header
data = np.fromfile(f, dtype=np.int32, count=1) # (end)
data = np.fromfile(f, dtype=np.int32, count=1) # (start)
ibu = np.fromfile(f, dtype=np.int32, count=ibmax)
data = np.fromfile(f, dtype=np.int32, count=1) # (end)
data = np.fromfile(f, dtype=np.int32, count=1) # (start)
data = np.fromfile(f, dtype=np.int32, count=2 * ibmax)
is1 = np.zeros(ibmax, dtype=np.int32)
is2 = np.zeros(ibmax, dtype=np.int32)
for i in range(ibmax):
# first section of reach i (FORTRAN numbering)
is1[i] = data[2 * i]
# last section of reach i (FORTRAN numbering)
is2[i] = data[2 * i + 1]
data = np.fromfile(f, dtype=np.int32, count=1) # (end)
# fourth line
pk = np.zeros(ismax, dtype=np.float32)
for k in range(0, ismax, kbl):
data = np.fromfile(f, dtype=np.int32, count=1) # (start)
pk[k:min(k + kbl, ismax)] = np.fromfile(
f, dtype=np.float32, count=min(k + kbl, ismax) - k
)
data = np.fromfile(f, dtype=np.int32, count=1) # (end)
data = np.fromfile(f, dtype=np.int32, count=1) # (start)
zmin_OLD = np.fromfile(f, dtype=np.float32, count=1)[0]
data = np.fromfile(f, dtype=np.int32, count=1) # (end)
# sixth line
zf = np.zeros(ismax, dtype=np.float32)
z = np.zeros(ismax * 3, dtype=np.float32)
for k in range(0, ismax, kbl):
data = np.fromfile(f, dtype=np.int32, count=1) # (start)
z[3 * k:3 * min(k + kbl, ismax)] = np.fromfile(
f, dtype=np.float32,
count=3 * (min(k + kbl, ismax) - k)
)
# z[i*3+1] and z[i*3+2] are useless
data = np.fromfile(f, dtype=np.int32, count=1) # (end)
zf = [z[i * 3] for i in range(ismax)]
# seventh line (useless)
for k in range(0, ismax, kbl):
data = np.fromfile(f, dtype=np.int32, count=1) # (start)
zero = np.fromfile(f, dtype=np.int32, count=ismax)
data = np.fromfile(f, dtype=np.int32, count=1) # (end)
data = np.fromfile(f, dtype=np.int32, count=1) # (start)
while data.size > 0:
ismax = np.fromfile(f, dtype=np.int32, count=1)[0]
t = np.fromfile(f, dtype=np.float64, count=1)[0]
data_tmp[key_pol][t] = {}
c = np.fromfile(f, dtype=np.byte, count=1)
# possible values :
# sediment : C, G, M, D, L, N, R
phys_var = bytearray(c).decode()
data_tmp[key_pol][t][phys_var] = {}
real_data = np.fromfile(f, dtype=np.float32, count=ismax)
data = np.fromfile(f, dtype=np.int32, count=1) # (end)
data_tmp[key_pol][t][phys_var] = real_data
data = np.fromfile(f, dtype=np.int32, count=1) # (start)
pollutants_keys = list(data_tmp.keys())
timestamps_keys = list(data_tmp[pollutants_keys[0]].keys())
phys_data_names = list(data_tmp[pollutants_keys[0]]
[timestamps_keys[0]].keys())
# ##print("pol keys: ", pollutants_keys)
# ##print("t keys: ", timestamps_keys)
# ##print("phys var: ", phys_data_names)
# ##print("phys data names mass zero:",
# list(data_tmp[pollutants_keys[1]][timestamps_keys[0]].keys()))
# print("set timestamps keys: ", set(timestamps_keys))
# print("isma")
# ##print("iprofiles: ", iprofiles)
for r, reach in enumerate(reachs):
for i in range(is1[r]-1, is2[r]):
p_i = ip_to_ri(reach, i)
for t_data in timestamps_keys:
pol_view = []
for pol in pollutants_keys:
pol_view.append(tuple(list(map(
lambda data_el: data_el[i],
list(data_tmp[pol][t_data].values())
))))
reach.set(p_i, t_data, "pols", pol_view)
results.set("timestamps", set(timestamps_keys))
@timer
def results(self, study, repertory, qlog=None, name=None):
self._study = study
if name is None:
name = study.name.replace(" ", "_")
print("adist ts results repertory: ", repertory)
results = super(AdisTSwc, self).results(study,
repertory,
qlog,
name=name)
def export_func_dict(self):
return [
self._export_NUM,
self._export_DIF,
self._export_D90,
self._export_POLs,
def export(self, study, repertory, mage_rep, qlog=None, name="0"):
print("cmd solver adistswc : ", self._cmd_solver)
self._study = study
name = study.name.replace(" ", "_")
# Generate files
files = []
try:
for func in self.export_func_dict():
files = files + func(study, repertory, qlog, name=name)
self.export_additional_files(study, repertory, qlog, name=name)
self._export_REP(study, repertory, mage_rep,
files, qlog, name=name)
return True
except Exception as e:
logger.error(f"Failed to export study to {self._type}")
logger_exception(e)
return False