# Mage.py -- Pamhyr # Copyright (C) 2023-2024 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # -*- coding: utf-8 -*- import os import logging import numpy as np from tools import timer, trace from Solver.CommandLine import CommandLineSolver from Checker.Mage import ( MageNetworkGraphChecker, MageGeometryGuideLineChecker, ) from Model.Results.Results import Results from Model.Results.River.River import River, Reach, Profile logger = logging.getLogger() def mage_file_open(filepath, mode): f = open(filepath, mode) if "w" in mode: # Write header comment = "*" if ".ST" in filepath: comment = "#" f.write( f"{comment} " + "This file is generated by PAMHYR, please don't modify\n" ) return f class Mage(CommandLineSolver): _type = "mage" def __init__(self, name): super(Mage, self).__init__(name) self._type = "mage" self._cmd_input = "" self._cmd_solver = "@path @input -o @output" self._cmd_output = "" @classmethod def default_parameters(cls): lst = super(Mage, cls).default_parameters() lst += [ ("mage_min_timestep", "1.0"), ("mage_timestep_tra", "3600"), ("mage_timestep_bin", "0"), # ("mage_timestep_melissa", "0"), ("mage_implicitation", "0.70"), ("mage_continuity_discretization", "S"), ("mage_qsj_discretization", "B"), ("mage_stop_criterion_iterations", "R"), ("mage_iteration_type", "0"), ("mage_smooth_coef", "0"), ("mage_cfl_max", "-1."), ("mage_min_height", "0.1"), ("mage_max_niter", "10"), ("mage_timestep_reduction_factor", "2"), ("mage_precision_reduction_factor_Z", "1"), ("mage_precision_reduction_factor_Q", "1"), ("mage_niter_max_precision", "99"), ("mage_niter_before_switch", "99"), ("mage_max_froude", "1.5"), ("mage_diffluence_node_height_balance", "-1"), ("mage_compute_reach_volume_balance", "y"), ("mage_max_reach_volume_balance", "0.001"), ("mage_min_reach_volume_to_check", "1000.0"), ("mage_init_internal", "n"), ] return lst @classmethod def checkers(cls): lst = [ MageNetworkGraphChecker(connectivity=True), MageNetworkGraphChecker(connectivity=False), MageGeometryGuideLineChecker(), ] return lst ########## # Export # ########## def cmd_args(self, study): lst = super(Mage, self).cmd_args(study) lst.append("-r") return lst def input_param(self): name = self._study.name return f"{name}.REP" def output_param(self): name = self._study.name return f"{name}.BIN" def log_file(self): name = self._study.name return f"{name}.TRA" @timer def _export_ST(self, study, repertory, qlog, name="0"): files = [] if qlog is not None: qlog.put("Export ST file") os.makedirs(os.path.join(repertory, "net"), exist_ok=True) # Write header edges = study.river.edges() edges = list( filter( lambda e: e.is_enable(), edges ) ) for edge in edges: name = f"Reach_{edge.id + 1:>3}".replace(" ", "0") with mage_file_open( os.path.join(repertory, "net", f"{name}.ST"), "w+" ) as f: files.append(str(os.path.join("net", f"{name}.ST"))) cnt_num = 1 for profile in edge.reach.profiles: self._export_ST_profile_header( f, files, profile, cnt_num ) cnt_num += 1 # Points for point in profile.points: self._export_ST_point_line( f, files, point ) # Profile last line f.write(f" 999.9990 999.9990 999.9990\n") return files def _export_ST_profile_header(self, wfile, files, profile, cnt): num = f"{cnt:>6}" c1 = f"{profile.code1:>6}" c2 = f"{profile.code2:>6}" t = f"{len(profile.points):>6}" kp = f"{profile.kp:>12f}"[0:12] pname = profile.name if profile.name == "": # Generate name from profile id prefixed with # 'p' (and replace space char with '0' char) pname = f"p{profile.id:>3}".replace(" ", "0") name = f"{pname:<19}" # Generate sediment additional data if available sediment = "" if profile.sl is not None: if not any(filter(lambda f: ".GRA" in f, files)): files.append(self._gra_file) # Number of layers nl = len(profile.sl) sediment = f" {nl:>3}" # Layers data for layer in profile.sl.layers: sediment += ( f" {layer.height:>10} {layer.d50:>10} " + f"{layer.sigma:>10} " + f"{layer.critical_constraint:>10}" ) # Profile header line wfile.write(f"{num}{c1}{c2}{t} {kp} {pname} {sediment}\n") def _export_ST_point_line(self, wfile, files, point): x = f"{point.x:<12.4f}"[0:12] y = f"{point.y:<12.4f}"[0:12] z = f"{point.z:<12.4f}"[0:12] n = f"{point.name:<3}" # Generate sediment additional data if available sediment = "" prev = point.z if point.sl is not None: # Number of layers nl = len(point.sl) sediment = f"{nl:>3}" # Layers data for layer in point.sl.layers: prev = round(prev - layer.height, 5) sediment += ( f" {prev:>10} {layer.d50:>10} " + f"{layer.sigma:>10} " + f"{layer.critical_constraint:>10}" ) # Point line wfile.write(f"{x} {y} {z} {n} {sediment}\n") @timer def _export_BC(self, t, bounds, repertory, qlog, name="0"): files = [] if len(bounds) == 0: return files if qlog is not None: qlog.put(f"Export {t} file") with mage_file_open(os.path.join(repertory, f"{name}.{t}"), "w+") as f: files.append(f"{name}.{t}") for bound in bounds: if bound.node is None: continue name = f"{bound.node.id:3}".replace(" ", "x") f.write(f"* {bound.node.name} ({name}) {bound.bctype}\n") f.write(f"${name}\n") header = bound.header f.write(f"*{header[0]:>9}|{header[1]:>10}\n") for d in bound.data: v0 = d[0] v1 = d[1] if t in ["HYD", "QSO", "LIM"]: v0 /= 60 # Convert first column to minute f.write(f"{v0:10}{v1:10}\n") return files @timer def _export_bound_cond(self, study, repertory, qlog, name="0"): files = [] lst = study.river.boundary_condition AVA = [] HYD = [] LIM = [] QSO = [] for tab in ["liquid", "solid", "suspenssion"]: for bound in lst.get_tab(tab): if bound.node is None: continue if bound.bctype == "ZD": AVA.append(bound) elif bound.bctype == "TD" or bound.bctype == "PC": HYD.append(bound) elif bound.bctype == "TZ": LIM.append(bound) elif bound.bctype == "SL": QSO.append(bound) files = files + self._export_BC("AVA", AVA, repertory, qlog, name=name) files = files + self._export_BC("HYD", HYD, repertory, qlog, name=name) files = files + self._export_BC("LIM", LIM, repertory, qlog, name=name) files = files + self._export_QSO(QSO, repertory, qlog, name=name) return files # @timer # def _export_LC(self, lateral, repertory, qlog, name="0"): # files = [] # if qlog is not None: # qlog.put(f"Export LAT file") # with mage_file_open( # os.path.join(repertory, f"{name}.LAT"), # "w+" # ) as f: # files.append(f"{name}.LAT") # name = f"{lateral.node.id:3}".replace(" ", "x") # f.write(f"* {lateral.node.name} ({name}) {lateral.bctype}\n") # f.write(f"${name}\n") # header = lateral.header # f.write(f"*{header[0]:>9}|{header[1]:>10}\n") # for d in lateral.data: # f.write(f"{d[0]:1{name}.3f}{d[1]:10.3f}\n") # return files # @timer # def _export_lateral_contrib(self, study, repertory, qlog, name="0"): # files = [] # lst = study.river.lateral_contribution # for tab in ["liquid", "solid", "suspenssion"]: # for lateral in lst.get_tab(tab): # files = files + self._export_LC(lateral, repertory, qlog) # return files @timer def _export_RUG(self, study, repertory, qlog, name="0"): files = [] if qlog is not None: qlog.put("Export RUG file") # Write header with mage_file_open(os.path.join(repertory, f"{name}.RUG"), "w+") as f: files.append(f"{name}.RUG") edges = study.river.edges() edges = list( filter( lambda e: e.is_enable(), edges ) ) id = 1 for edge in edges: frictions = edge.frictions for friction in frictions.frictions: if friction.begin_strickler is None: continue num = f"{id:>3}" bkp = f"{friction.begin_kp:>10.3f}" ekp = f"{friction.end_kp:>10.3f}" # if friction.begin_kp != friction.end_kp: # print("TODO") strickler = friction.begin_strickler coef_1 = f"{strickler.minor:>10.3f}" coef_2 = f"{strickler.medium:>10.3f}" f.write(f"K{num} {bkp}{ekp}{coef_1}{coef_2}\n") id += 1 return files @timer def _export_INI(self, study, repertory, qlog, name="0"): files = [] if qlog is not None: qlog.put("Export INI file") # Write header with mage_file_open(os.path.join(repertory, f"{name}.INI"), "w+") as f: has_ini = False id = 1 reachs = study.river.enable_edges() # TODO put real date... f.write(f"$ date en minutes : 0.00\n") f.write(f"* IB IS discharge elevation kp\n") id = 1 for reach in reachs: cond = study.river.initial_conditions.get(reach) data = cond.data if len(data) == 0: continue has_ini = True id_sec = 1 for d in data: IR = f"{id}" IS = f"{id_sec}" discharge = f"{d['discharge']:>10.5f}" elevation = f"{d['elevation']:>11.6f}" kp = f"{d['kp']:>9.2f}" f.write(f"{IR} {IS} {discharge} {elevation} {kp}\n") id_sec += 1 id += 1 if has_ini: files.append(f"{name}.INI") return files @timer def _export_CAS(self, study, repertory, qlog, name="0"): files = [] reservoirs = study.river.reservoir.lst if len(reservoirs) == 0: return files if qlog is not None: qlog.put("Export CAS file") with mage_file_open(os.path.join(repertory, f"{name}.CAS"), "w+") as f: files.append(f"{name}.CAS") for reservoir in reservoirs: if reservoir.node is None: continue reservoir.sort() node = reservoir.node name = f"{node.id:3}".replace(" ", "x") f.write(f"* {node.name} ({name}) Reservoir\n") f.write(f"${name}\n") f.write(f"*{'Elev(m)':>9}|{'Area(ha)':>10}\n") for d in reservoir.data: v0 = d[0] v1 = d[1] f.write(f"{v0:>10.3f}{v1:>10.3f}\n") return files @timer def _export_SIN(self, study, repertory, qlog, name="0"): files = [] sin_dict = { "ND": "*", "S1": "D", "S2": "T", "S3": "T", "OR": "O", "OC": "B", "OV": "F", "CV": "O", # CheckValve "V1": "V", "V2": "W", "BO": "A", "UD": "X", "PO": "P", } hydraulic_structures = study.river.hydraulic_structures.lst if len(hydraulic_structures) == 0: return files if qlog is not None: qlog.put("Export SIN file") with mage_file_open(os.path.join(repertory, f"{name}.SIN"), "w+") as f: files.append(f"{name}.SIN") for hs in hydraulic_structures: if hs.input_reach is None: continue if not hs.input_reach.is_enable(): continue f.write( '* ouvrage au pk ' + f"{float(hs.input_kp):>12.1f}" + ' ' + hs.name + '\n' ) for bhs in hs.basic_structures: reach_id = study.river.get_edge_id(hs.input_reach) + 1 param_str = ' '.join( [ f'{p:>10.3f}' for p in self._export_SIN_parameters(bhs) ] ) name = bhs.name if name == "": name = f"HS_{bhs.id:>3}".replace(" ", "0") else: name = name.replace(" ", "_") f.write( f"{sin_dict[bhs._type]} " + f"{reach_id} {float(hs.input_kp):>12.3f} " + f"{param_str} {name}\n" ) return files def _export_SIN_parameters(self, bhs): res = [9999.999] * 5 if len(bhs) == 5: res = self._export_SIN_parameters_5(bhs) elif len(bhs) == 4: res = self._export_SIN_parameters_4(bhs) elif len(bhs) == 3: res = self._export_SIN_parameters_3(bhs) return res def _export_SIN_parameters_5(self, bhs): # S2, OR, V1, V2, UD, CV return [ bhs._data[0].value, bhs._data[1].value, bhs._data[2].value, bhs._data[3].value, bhs._data[4].value, ] def _export_SIN_parameters_4(self, bhs): # S3, OC res = [ bhs._data[0].value, bhs._data[1].value, bhs._data[2].value, bhs._data[3].value, 0.0, ] if bhs._type == "T": # S3 res = [0.0] + res[:-1] return res def _export_SIN_parameters_3(self, bhs): # S1, BO if bhs._type == "S1": res = [ bhs._data[0].value, bhs._data[1].value, 0.0, bhs._data[2].value, 9999.99, ] else: res = [ bhs._data[0].value, bhs._data[1].value, bhs._data[2].value, 0.0, 0.0, ] return res @timer def _export_VAR(self, study, repertory, qlog, name="0"): files = [] hydraulic_structures = study.river.hydraulic_structures.lst if len(hydraulic_structures) == 0: return files if qlog is not None: qlog.put("Export VAR file") with mage_file_open(os.path.join(repertory, f"{name}.VAR"), "w+") as f: files.append(f"{name}.VAR") for hs in hydraulic_structures: if hs.input_reach is None: continue if not hs.input_reach.is_enable(): continue for bhs in hs.basic_structures: logger.info(bhs._type) if bhs._type != "CV": continue name = bhs.name if name == "": name = f"HS_{bhs.id:>3}".replace(" ", "0") f.write( f"${name} clapet" ) return files @timer def _export_DEV(self, study, repertory, qlog, name="0"): files = [] if qlog is not None: qlog.put("Export DEV file") with mage_file_open( os.path.join( repertory, f"{name}.DEV" ), "w+" ) as f: reachs = study.river.enable_edges() id = 1 for reach in reachs: f.write(f"YD{id:3}\n") f.write(f"YG{id:3}\n") id += 1 files.append(f"{name}.DEV") return files @timer def _export_REP(self, study, repertory, files, qlog, name="0"): if qlog is not None: qlog.put("Export REP file") # Write header with mage_file_open( os.path.join( repertory, f"{name}.REP" ), "w+" ) as f: f.write("confirmation=non\n") for file in files: EXT = file.split('.')[1] if EXT not in ["ST", "GRA"]: f.write(f"{EXT} {file}\n") f.write("* OUTPUT\n") f.write(f"TRA {name}.TRA\n") f.write(f"BIN {name}.BIN\n") for file in files: EXT = file.split('.')[1] if EXT in ["GRA"]: f.write(f"{EXT} {file}\n") @timer def export(self, study, repertory, qlog=None): self._study = study name = study.name.replace(" ", "_") # Define GRA file name self._gra_file = f"{name}.GRA" self._bin_file = f"{name}.BIN" self._export_ST(study, repertory, qlog, name=name) return True ########### # RESULTS # ########### def read_bin(self, study, repertory, results, qlog=None, name="0"): return @timer def results(self, study, repertory, qlog=None, name="0"): results = Results( study=study, solver=self, repertory=repertory, name=name, ) self.read_bin(study, repertory, results, qlog, name=name) return results ########## # MAGE 7 # ########## class Mage7(Mage): _type = "mage7" def __init__(self, name): super(Mage7, self).__init__(name) self._type = "mage7" @classmethod def default_parameters(cls): lst = super(Mage7, cls).default_parameters() return lst ########## # MAGE 8 # ########## class Mage8(Mage): _type = "mage8" def __init__(self, name): super(Mage8, self).__init__(name) self._type = "mage8" @classmethod def default_parameters(cls): lst = super(Mage8, cls).default_parameters() # Insert new parameters at specific position names = list(map(lambda t: t[0], lst)) i = names.index("mage_precision_reduction_factor_Q") lst.insert(i+1, ("mage_precision_reduction_factor_r", "1")) # Mage parameter for sediment module (added in DB 0.0.4) lst.append(("mage_sediment_masse_volumique", "2650.0")) lst.append(("mage_sediment_angle_repos", "40.0")) lst.append(("mage_sediment_porosity", "0.40")) lst.append(("mage_distance_Han", "0.0")) lst.append(("mage_distance_chargement_d50", "100.0")) lst.append(("mage_distance_chargement_sigma", "100.0")) lst.append(("mage_methode_modification_geometrie", "1")) lst.append(("mage_shields_critique", "1")) lst.append(("mage_shields_correction", "1")) lst.append(("mage_capacite_solide", "1")) lst.append(("mage_pas_de_temps_charriage", "1")) lst.append(("mage_facteur_multiplicateur", "1.0")) return lst ########## # Export # ########## def cmd_args(self, study): lst = super(Mage8, self).cmd_args(study) if study.river.has_sediment(): lst.append("-c=3") return lst @timer def _export_PAR(self, study, repertory, qlog=None, name="0"): files = [] if qlog is not None: qlog.put("Export PAR file") with mage_file_open(os.path.join(repertory, f"{name}.PAR"), "w+") as f: files.append(f"{name}.PAR") params = study.river.get_params(self.type).parameters for p in params: name = p.name\ .replace("all_", "")\ .replace("mage_", "") value = p.value if name in ["command_line_arguments"]: continue if name == "compute_reach_volume_balance": value = "O" if value.lower() == "y" else "N" if name == "init_internal": value = ("p" if value.lower() in ["y", "yes", "true"] else "") logger.debug( f"export: PAR: {name}: {value} ({p.value})" ) f.write(f"{name} {value}\n") return files @timer def _export_NET(self, study, repertory, qlog=None, name="0"): files = [] if qlog is not None: qlog.put("Export NET file") with mage_file_open(os.path.join(repertory, f"{name}.NET"), "w+") as f: files.append(f"{name}.NET") edges = study.river.edges() edges = list( filter( lambda e: e.is_enable(), edges ) ) for e in edges: name = f"Reach_{e.id + 1:>3}".replace(" ", "0") id = name n1 = f"{e.node1.id:3}".replace(" ", "x") n2 = f"{e.node2.id:3}".replace(" ", "x") file = os.path.join("net", name + ".ST") f.write(f"{id} {n1} {n2} {file}\n") return files @timer def _export_QSO(self, bounds, repertory, qlog, name="0"): files = [] if len(bounds) == 0: return files if qlog is not None: qlog.put(f"Export QSO file") with mage_file_open(os.path.join(repertory, f"{name}.QSO"), "w+") as f: files.append(f"{name}.QSO") for bound in bounds: # File header name = f"{bound.node.id:3}".replace(" ", "x") f.write(f"* {bound.node.name} ({name}) {bound.bctype}\n") d50 = bound.d50 sigma = bound.sigma if len(bound.data) == 0: f.write(f"${name} {d50} {sigma} default\n") else: f.write(f"${name} {d50} {sigma}\n") # Table header header = bound.header f.write(f"*{header[0]:>9}|{header[1]:>10}\n") # Data for d in bound.data: f.write(f"{d[0]:10.3f}{d[1]:10.3f}\n") return files @timer def export(self, study, repertory, qlog=None, name="0"): self._study = study name = study.name.replace(" ", "_") # Define GRA file name self._gra_file = f"{name}.GRA" self._bin_file = f"{name}.BIN" # Generate files files = [] files = self._export_ST(study, repertory, qlog, name=name) files = files + self._export_PAR(study, repertory, qlog, name=name) files = files + self._export_NET(study, repertory, qlog, name=name) files = files + \ self._export_bound_cond(study, repertory, qlog, name=name) files = files + self._export_RUG(study, repertory, qlog, name=name) files = files + self._export_INI(study, repertory, qlog, name=name) files = files + self._export_SIN(study, repertory, qlog, name=name) files = files + self._export_VAR(study, repertory, qlog, name=name) files = files + self._export_CAS(study, repertory, qlog, name=name) files = files + self._export_DEV(study, repertory, qlog, name=name) self._export_REP(study, repertory, files, qlog, name=name) return True ########### # RESULTS # ########### @timer def read_bin(self, study, repertory, results, qlog=None, name="0"): fname = os.path.join(repertory, f"{name}.BIN") logger.info(f"read_bin: Start reading '{fname}' ...") with mage_file_open(fname, "r") as f: def newline(): return np.fromfile(f, dtype=np.int32, count=1) def endline(): return np.fromfile(f, dtype=np.int32, count=1) def read_int(size): return np.fromfile( f, dtype=np.int32, count=size) def read_float(size): return np.fromfile( f, dtype=np.float32, count=size) def read_float64(size): return np.fromfile( f, dtype=np.float64, count=size) # Meta data (1st line) newline() data = read_int(3) nb_reach = data[0] nb_profile = data[1] mage_version = data[2] logger.debug(f"read_bin: nb_reach = {nb_reach}") logger.debug(f"read_bin: nb_profile = {nb_profile}") logger.debug(f"read_bin: mage_version = {mage_version}") if mage_version <= 80: msg = ( "Read BIN files: " + f"Possible incompatible mage version '{mage_version}', " + "please check your solver configuration..." ) logger.warning(msg) if qlog is not None: qlog.put("[WARNING] " + msg) results.set("solver_version", f"Mage8 ({mage_version})") results.set("nb_reach", f"{nb_reach}") results.set("nb_profile", f"{nb_profile}") endline() # Reach information (2nd line) newline() reachs = [] iprofiles = {} reach_offset = {} data = read_int(2*nb_reach) for i in range(nb_reach): # Add results reach to reach list r = results.river.add(i) reachs.append(r) # ID of first and last reach profiles i1 = data[2*i] - 1 i2 = data[2*i+1] - 1 # Add profile id correspondance to reach key = (i1, i2) iprofiles[key] = r # Profile ID offset reach_offset[r] = i1 logger.debug(f"read_bin: iprofiles = {iprofiles}") endline() # X (3rd line) newline() _ = read_float(nb_profile) endline() # Z and Y (4th line) newline() _ = read_float(3*nb_profile) endline() # Data newline() def ip_to_r(i): return iprofiles[ next( filter( lambda k: k[0] <= i <= k[1], iprofiles ) ) ] def ip_to_ri(r, i): return i - reach_offset[r] ts = set() end = False while not end: n = read_int(1)[0] timestamp = read_float64(1)[0] key = bytearray(np.fromfile( f, dtype=np.byte, count=1)).decode() data = read_float(n) logger.debug(f"read_bin: timestamp = {timestamp} sec") ts.add(timestamp) if key in ["Z", "Q"]: for i, d in enumerate(data): # Get reach corresponding to profile ID reach = ip_to_r(i) # Get profile id in reach ri = ip_to_ri(reach, i) # Set data for profile RI reach.set(ri, timestamp, key, d) if key == "Z": profile = reach.profile(ri) limits = profile.geometry.get_water_limits(d) reach.set( ri, timestamp, "water_limits", limits ) endline() end = newline().size <= 0 logger.debug(reachs[0].profiles[0]._data) results.set("timestamps", ts) logger.info(f"read_bin: ... end with {len(ts)} timestamp read") @timer def read_gra(self, study, repertory, results, qlog=None, name="0"): if not study.river.has_sediment(): return fname = os.path.join(repertory, f"{name}.GRA") logger.info(f"read_gra: Start reading '{fname}' ...") with mage_file_open(fname, "r") as f: def newline(): return np.fromfile(f, dtype=np.int32, count=1) def endline(): return np.fromfile(f, dtype=np.int32, count=1) def read_int(size): return np.fromfile( f, dtype=np.int32, count=size) def read_float(size): return np.fromfile( f, dtype=np.float32, count=size) def read_float64(size): return np.fromfile( f, dtype=np.float64, count=size) # Meta data (1st line) newline() data = read_int(3) nb_reach = data[0] nb_profile = data[1] mage_version = data[2] logger.debug(f"read_gra: nb_reach = {nb_reach}") logger.debug(f"read_gra: nb_profile = {nb_profile}") logger.debug(f"read_gra: mage_version = {mage_version}") if mage_version < 80: msg = ( "Read GRA files: " + f"Possible incompatible mage version '{mage_version}', " + "please check your solver configuration..." ) logger.warning(msg) if qlog is not None: qlog.put("[WARNING] " + msg) results.set("gra_solver_version", f"Mage8 ({mage_version})") results.set("gra_nb_reach", f"{nb_reach}") results.set("gra_nb_profile", f"{nb_profile}") endline() # Reach information (2nd line) newline() reachs = [] iprofiles = {} reach_offset = {} data = read_int(2*nb_reach) for i in range(nb_reach): # Get results reach to reach list r = results.river.reach(i) reachs.append(r) # ID of first and last reach profiles i1 = data[2*i] - 1 i2 = data[2*i+1] - 1 # Add profile id correspondance to reach key = (i1, i2) iprofiles[key] = r # Profile ID offset reach_offset[r] = i1 logger.debug(f"read_gra: iprofiles = {iprofiles}") endline() # X (3rd line) newline() _ = read_float(nb_profile) endline() # npts (4th line) newline() _ = read_int(nb_profile) endline() # Data def ip_to_r(i): return iprofiles[ next( filter( lambda k: k[0] <= i <= k[1], iprofiles ) ) ] def ip_to_ri(r, i): return i - reach_offset[r] ts = set() end = False newline() while not end: n = read_int(1)[0] timestamp = read_float64(1)[0] with_bedload = read_int(1)[0] logger.debug(f"read_gra: Number of cross section: {n}") logger.debug(f"read_gra: Timestamp: {timestamp}") logger.debug(f"read_gra: Type of bedload: {with_bedload}") endline() npts = [1] * n if with_bedload == 1: newline() npts = read_int(n) endline() sum_npts = sum(npts) logger.debug(f"read_gra: Number of points: {sum_npts}") newline() nsl = read_int(sum_npts) logger.debug(f"read_gra: Number of sedimentary layers: {nsl}") endline() newline() data = read_float64(sum(nsl) * 3) endline() ts.add(timestamp) i_pts = 0 i_data = 0 # Loop on cross section for i in range(n): sec_sl = [] reach = ip_to_r(i) p_i = ip_to_ri(reach, i) # Loop on cross section points for j in range(npts[i]): sl = [] # Loop on sediment layers for k in range(nsl[i_pts]): h = data[i_data] d50 = data[i_data + 1] sigma = data[i_data + 2] sl.append((h, d50, sigma)) i_data += 3 i_pts += 1 sec_sl.append(sl) reach.set(p_i, timestamp, "sl", sec_sl) logger.debug( f"read_gra: data size = {len(data)} ({i_data} readed)" ) end = newline().size <= 0 results.set("sediment_timestamps", ts) logger.info(f"read_gra: ... end with {len(ts)} timestamp read") @timer def results(self, study, repertory, qlog=None, name=None): self._study = study if name is None: name = study.name.replace(" ", "_") results = super(Mage8, self).results(study, repertory, qlog, name=name) self.read_gra(study, repertory, results, qlog, name=name) return results