# PlotKPC.py -- Pamhyr # Copyright (C) 2023-2024 INRAE # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # -*- coding: utf-8 -*- import logging from functools import reduce from tools import timer from View.Tools.PamhyrPlot import PamhyrPlot from PyQt5.QtCore import ( QCoreApplication ) logger = logging.getLogger() class PlotKPC(PamhyrPlot): def __init__(self, canvas=None, trad=None, toolbar=None, results=None, reach_id=0, profile_id=0, parent=None): super(PlotKPC, self).__init__( canvas=canvas, trad=trad, data=results, toolbar=toolbar, parent=parent ) self._timestamps = results.get("timestamps") self._current_timestamp = max(self._timestamps) self._current_reach_id = reach_id self._current_profile_id = profile_id self.label_x = self._trad["unit_kp"] self.label_y = self._trad["unit_elevation"] self.label_bottom = self._trad["label_bottom"] self.label_water = self._trad["label_water"] self.label_water_max = self._trad["label_water_max"] self._isometric_axis = False @property def results(self): return self.data @results.setter def results(self, results): self.data = results self._current_timestamp = max(results.get("timestamps")) @timer def draw(self, highlight=None): self.init_axes() if self.results is None: return reach = self.results.river.reach(self._current_reach_id) self.draw_bottom(reach) self.draw_water_elevation(reach) self.draw_water_elevation_max(reach) self.draw_water_elevation_overflow(reach) self.draw_current(reach) # self.enable_legend() self.idle() self._init = True def draw_bottom(self, reach): if reach.has_sediment(): self.draw_bottom_with_bedload(reach) else: self.draw_bottom_geometry(reach) def draw_bottom_with_bedload(self, reach): kp = reach.geometry.get_kp() z_min = reach.geometry.get_z_min() z_max = reach.geometry.get_z_max() initial_sl = self.sl_compute_initial(reach) current_sl = self.sl_compute_current(reach) max_sl_num = reduce( lambda acc, sl: max(acc, len(sl)), current_sl, 0 ) sl_init, sl = self.sl_completed_layers( initial_sl, current_sl, max_sl_num ) z_sl = self.sl_apply_z_min_to_initial(sl_init, z_min) d_sl = self.sl_compute_diff(sl_init, sl) final_z_sl = self.sl_apply_diff(reach, z_sl, d_sl) final_z_sl = list(reversed(final_z_sl)) self.line_kp_sl = [] for i, z in enumerate(final_z_sl): self.line_kp_sl.append(None) self.line_kp_sl[i], = self.canvas.axes.plot( kp, z, linestyle=( "solid" if i == len(final_z_sl) - 1 else self.linestyle[1:][i // len(self.colors)] ), lw=1., color=( self.color_plot_river_bottom if i == len(final_z_sl) - 1 else self.colors[i % len(self.colors)] ) ) self._initial_sl = initial_sl self._river_bottom = final_z_sl[-1] def sl_compute_initial(self, reach): """ Get SL list for profile p at initial time (initial data) """ return list( map( lambda p: p.get_ts_key(min(self._timestamps), "sl")[0], reach.profiles ) ) def sl_compute_current(self, reach): """ Get SL list for profile p at current time """ return list( map( lambda p: p.get_ts_key(self._current_timestamp, "sl")[0], reach.profiles ) ) def sl_completed_layers(self, initial_sl, current_sl, max_sl_num): sl = [] sl_init = [] for i in range(max_sl_num): cur = [] cur_init = [] for current, initial in zip(current_sl, initial_sl): if i < len(initial_sl): cur.append(current[i][0]) cur_init.append(initial[i][0]) else: cur.append(0) cur_init.append(0) sl.append(cur) sl_init.append(cur_init) return sl_init, sl def sl_apply_z_min_to_initial(self, sl_init, z_min): """ Compute sediment layer from initial data in function to profile z_min """ return reduce( lambda acc, v: acc + [ list( map( lambda x, y: y - x, v, acc[-1] ) ) ], sl_init, [z_min] ) def sl_compute_diff(self, sl_init, sl): return list( map( lambda ln0, lni: list( map( lambda z0, zi: z0 - zi, ln0, lni ) ), sl_init, sl ) ) def sl_apply_diff(self, reach, z_sl, d_sl): f = list(map(lambda p: 0, reach.profiles)) return list( map( lambda z, d: list( map( lambda zn, dn: zn - dn, z, d ) ), z_sl, d_sl + [f] # HACK: Add dummy data for last layer ) ) def draw_bottom_geometry(self, reach): kp = reach.geometry.get_kp() z_min = reach.geometry.get_z_min() z_max = reach.geometry.get_z_max() self.line_kp_zmin = self.canvas.axes.plot( kp, z_min, color=self.color_plot_river_bottom, lw=1. ) self._river_bottom = z_min def draw_water_elevation(self, reach): if len(reach.geometry.profiles) != 0: kp = reach.geometry.get_kp() z_min = reach.geometry.get_z_min() water_z = list( map( lambda p: p.get_ts_key( self._current_timestamp, "Z" ), reach.profiles ) ) self.water = self.canvas.axes.plot( kp, water_z, lw=1., color=self.color_plot_river_water, ) self.water_fill = self.canvas.axes.fill_between( kp, self._river_bottom, water_z, color=self.color_plot_river_water_zone, alpha=0.7, interpolate=True ) def draw_water_elevation_max(self, reach): if len(reach.geometry.profiles) != 0: kp = reach.geometry.get_kp() z_min = reach.geometry.get_z_min() water_z = list( map( lambda p: max(p.get_key("Z")), reach.profiles ) ) self.canvas.axes.plot( kp, water_z, lw=1., color=self.color_plot_river_water, linestyle='dotted', ) def draw_current(self, reach): kp = reach.geometry.get_kp() z_min = reach.geometry.get_z_min() z_max = reach.geometry.get_z_max() self.profile, = self.canvas.axes.plot( [ kp[self._current_profile_id], kp[self._current_profile_id] ], [ z_max[self._current_profile_id], z_min[self._current_profile_id] ], color=self.color_plot, lw=1. ) def draw_water_elevation_overflow(self, reach): overflow = [] for profile in reach.profiles: z_max = max(profile.get_key("Z")) z_max_ts = 0 for ts in self._timestamps: z = profile.get_ts_key(ts, "Z") if z == z_max: z_max_ts = ts break pt_left, pt_right = profile.get_ts_key(z_max_ts, "water_limits") if self.is_overflow_point(profile, pt_left): overflow.append((profile, z_max)) elif self.is_overflow_point(profile, pt_right): overflow.append((profile, z_max)) for profile, z in overflow: self.canvas.axes.plot( profile.kp, z, lw=1., color=self.color_plot, markersize=3, marker='x' ) def is_overflow_point(self, profile, point): left_limit = profile.geometry.point(0) right_limit = profile.geometry.point( profile.geometry.number_points - 1 ) return ( point == left_limit or point == right_limit ) def set_reach(self, reach_id): self._current_reach_id = reach_id self._current_profile_id = 0 self.draw() def set_profile(self, profile_id): self._current_profile_id = profile_id self.update_current() def set_timestamp(self, timestamp): self._current_timestamp = timestamp self.update() def update(self): if not self._init: self.draw() reach = self.results.river.reach(self._current_reach_id) if reach.has_sediment(): self.update_bottom_with_bedload() self.update_water_elevation() self.update_idle() def update_water_elevation(self): reach = self.results.river.reach(self._current_reach_id) kp = reach.geometry.get_kp() z_min = reach.geometry.get_z_min() water_z = list( map( lambda p: p.get_ts_key( self._current_timestamp, "Z" ), reach.profiles ) ) self.water[0].set_data( kp, water_z ) self.water_fill.remove() self.water_fill = self.canvas.axes.fill_between( kp, self._river_bottom, water_z, color=self.color_plot_river_water_zone, alpha=0.7, interpolate=True ) def update_current(self): reach = self.results.river.reach(self._current_reach_id) kp = reach.geometry.get_kp() z_min = reach.geometry.get_z_min() z_max = reach.geometry.get_z_max() cid = self._current_profile_id self.profile.set_data( [kp[cid], kp[cid]], [z_max[cid], z_min[cid]] ) self.canvas.figure.canvas.draw_idle() def update_bottom_with_bedload(self): reach = self.results.river.reach(self._current_reach_id) kp = reach.geometry.get_kp() z_min = reach.geometry.get_z_min() initial_sl = self._initial_sl current_sl = self.sl_compute_current(reach) max_sl_num = reduce( lambda acc, sl: max(acc, len(sl)), current_sl, 0 ) sl_init, sl = self.sl_completed_layers( initial_sl, current_sl, max_sl_num ) z_sl = self.sl_apply_z_min_to_initial(sl_init, z_min) d_sl = self.sl_compute_diff(sl_init, sl) final_z_sl = self.sl_apply_diff(reach, z_sl, d_sl) final_z_sl = list(reversed(final_z_sl)) for i, z in enumerate(final_z_sl): self.line_kp_sl[i].set_data(kp, z) self._river_bottom = final_z_sl[-1]