Commit 18be4af8 authored by Le Roux Erwan's avatar Le Roux Erwan
Browse files

[EUROCODE GRAPH] update burn in percentage to 0.5 (update test). delete test...

[EUROCODE GRAPH] update burn in percentage to 0.5 (update test). delete test region eurocode. update eurocode graph.
parent 2a312ff5
No related merge requests found
Showing with 229 additions and 236 deletions
+229 -236
......@@ -44,7 +44,7 @@ class AbstractEurocodeRegion(object):
return 3.5, -2.45
def plot_max_loading(self, ax, altitudes):
old_label = 'Eurocode computed in {}'.format(LAST_YEAR_FOR_EUROCODE)
# old_label = 'Eurocode computed in {}'.format(LAST_YEAR_FOR_EUROCODE)
new_label = 'Eurocode standards'
ax.plot(altitudes, [self.eurocode_max_loading(altitude) for altitude in altitudes],
label=new_label, color='k')
......
from enum import Enum
from typing import List
import numpy as np
......@@ -11,16 +12,26 @@ from extreme_fit.estimator.abstract_estimator import AbstractEstimator
from extreme_fit.estimator.margin_estimator.abstract_margin_estimator import LinearMarginEstimator
from extreme_fit.estimator.utils import load_margin_function
from extreme_fit.model.margin_model.linear_margin_model.abstract_temporal_linear_margin_model import \
AbstractTemporalLinearMarginModel
AbstractTemporalLinearMarginModel, TemporalMarginFitMethod
from extreme_fit.model.margin_model.margin_function.linear_margin_function import LinearMarginFunction
from extreme_fit.model.result_from_model_fit.result_from_extremes import ResultFromExtremes
from extreme_fit.model.result_from_model_fit.result_from_extremes import ResultFromExtremes, ResultFromBayesianExtremes
def compute_eurocode_level_uncertainty(last_year_for_the_data, smooth_maxima_x_y, model_class):
class ConfidenceIntervalMethodFromExtremes(Enum):
# Confidence interval from the ci function
bayes = 0
normal = 1
boot = 2
proflik = 3
# Confidence interval from my functions
my_bayes = 4
def compute_eurocode_level_uncertainty(last_year_for_the_data, smooth_maxima_x_y, model_class, ci_method):
years, smooth_maxima = smooth_maxima_x_y
idx = years.index(last_year_for_the_data) + 1
years, smooth_maxima = years[:idx], smooth_maxima[:idx]
return EurocodeLevelUncertaintyFromExtremes.from_maxima_years_model_class(smooth_maxima, years, model_class)
return EurocodeLevelUncertaintyFromExtremes.from_maxima_years_model_class(smooth_maxima, years, model_class, ci_method)
class EurocodeLevelUncertaintyFromExtremes(object):
......@@ -31,28 +42,47 @@ class EurocodeLevelUncertaintyFromExtremes(object):
self.poster_uncertainty_interval = poster_uncertainty_interval
@classmethod
def from_estimator_extremes(cls, estimator_extremes: LinearMarginEstimator):
extractor = ExtractEurocodeReturnLevelFromExtremes(estimator_extremes, cls.YEAR_OF_INTEREST)
def from_estimator_extremes(cls, estimator_extremes: LinearMarginEstimator,
ci_method: ConfidenceIntervalMethodFromExtremes):
extractor = ExtractEurocodeReturnLevelFromExtremes(estimator_extremes, ci_method, cls.YEAR_OF_INTEREST)
return cls(extractor.posterior_mean_eurocode_return_level_for_the_year_of_interest,
extractor.posterior_eurocode_return_level_uncertainty_interval_for_the_year_of_interest)
@classmethod
def from_maxima_years_model_class(cls, maxima, years, model_class):
def from_maxima_years_model_class(cls, maxima, years, model_class,
ci_method=ConfidenceIntervalMethodFromExtremes.bayes):
# Load coordinates and dataset
coordinates, dataset = load_temporal_coordinates_and_dataset(maxima, years)
# Select fit method depending on the ci_method
if ci_method in [ConfidenceIntervalMethodFromExtremes.bayes,
ConfidenceIntervalMethodFromExtremes.my_bayes]:
fit_method = TemporalMarginFitMethod.extremes_fevd_bayesian
else:
fit_method = TemporalMarginFitMethod.extremes_fevd_mle
# Fitted estimator
fitted_estimator = fitted_linear_margin_estimator(model_class, coordinates, dataset, starting_year=1958,
fit_method=AbstractTemporalLinearMarginModel.EXTREMES_FEVD_BAYESIAN_FIT_METHOD_STR)
fit_method=fit_method)
# Load object from result from extremes
return cls.from_estimator_extremes(fitted_estimator)
return cls.from_estimator_extremes(fitted_estimator, ci_method)
class ExtractFromExtremes(object):
pass
class ExtractEurocodeReturnLevelFromExtremes(object):
ALPHA_CONFIDENCE_INTERVAL_UNCERTAINTY = 0.05
def __init__(self, estimator: LinearMarginEstimator, year_of_interest: int = YEAR_OF_INTEREST_FOR_RETURN_LEVEL):
def __init__(self, estimator: LinearMarginEstimator,
ci_method,
year_of_interest: int = YEAR_OF_INTEREST_FOR_RETURN_LEVEL,
alpha_for_confidence_interval: int = ALPHA_CONFIDENCE_INTERVAL_UNCERTAINTY,
):
self.estimator = estimator
self.result_from_fit = self.estimator.result_from_model_fit # type: ResultFromExtremes
self.result_from_fit = self.estimator.result_from_model_fit # type: ResultFromBayesianExtremes
assert isinstance(self.result_from_fit, ResultFromBayesianExtremes)
self.year_of_interest = year_of_interest
self.alpha_for_confidence_interval = alpha_for_confidence_interval
@property
def margin_functions_from_fit(self) -> List[LinearMarginFunction]:
......@@ -65,8 +95,8 @@ class ExtractEurocodeReturnLevelFromExtremes(object):
@property
def gev_params_from_fit_for_year_of_interest(self) -> List[GevParams]:
return [m.get_gev_params(coordinate=np.array([self.year_of_interest]), is_transformed=False)
for m in self.margin_functions_from_fit]
return [margin_function.get_gev_params(coordinate=np.array([self.year_of_interest]), is_transformed=False)
for margin_function in self.margin_functions_from_fit]
@cached_property
def posterior_eurocode_return_level_samples_for_year_of_interest(self) -> np.ndarray:
......@@ -80,6 +110,7 @@ class ExtractEurocodeReturnLevelFromExtremes(object):
@property
def posterior_eurocode_return_level_uncertainty_interval_for_the_year_of_interest(self):
# Bottom and upper quantile correspond to the quantile
bottom_and_upper_quantile = (0.025, 0.975)
bottom_quantile = self.alpha_for_confidence_interval / 2
bottom_and_upper_quantile = (bottom_quantile, 1 - bottom_quantile)
return [np.quantile(self.posterior_eurocode_return_level_samples_for_year_of_interest, q=q)
for q in bottom_and_upper_quantile]
......@@ -3,58 +3,80 @@ from typing import Dict, List
import matplotlib.pyplot as plt
from experiment.eurocode_data.eurocode_return_level_uncertainties import EurocodeLevelUncertaintyFromExtremes
from experiment.eurocode_data.massif_name_to_departement import DEPARTEMENT_TYPES
from experiment.eurocode_data.massif_name_to_departement import DEPARTEMENT_TYPES, massif_name_to_eurocode_region
from experiment.eurocode_data.utils import EUROCODE_QUANTILE, EUROCODE_ALTITUDES
from experiment.meteo_france_data.scm_models_data.visualization.utils import create_adjusted_axes
from root_utils import get_display_name_from_object_type
def plot_model_name_to_dep_to_ordered_return_level_uncertainties(
dep_to_model_name_to_ordered_return_level_uncertainties, altitudes, show=True):
# Create a 9 x 9 plot
axes = create_adjusted_axes(3, 3)
axes = list(axes.flatten())
ax6 = axes[5]
ax9 = axes[8]
axes.remove(ax6)
axes.remove(ax9)
ax_to_departement = dict(zip(axes, DEPARTEMENT_TYPES[::-1]))
for ax, departement in ax_to_departement.items():
plot_dep_to_model_name_dep_to_ordered_return_level_uncertainties(ax, departement,
altitudes,
dep_to_model_name_to_ordered_return_level_uncertainties[
departement]
)
ax6.remove()
ax9.remove()
plt.suptitle('50-year return levels of snow load for all French Alps departements. \n'
'Comparison between the maximum EUROCODE in the departement\n'
'and the maximum return level found (in 2017 for the non-stationary model) for the massif in the departement')
def get_label_name(model_name, ci_method_name: str):
is_non_stationary = model_name == 'NonStationary'
model_symbol = '{\mu_1, \sigma_1}' if is_non_stationary else '0'
parameter = ', 2017' if is_non_stationary else ''
model_name = ' $ \widehat{z_p}(\\boldsymbol{\\theta_{\mathcal{M}_'
model_name += model_symbol
model_name += '}}'
model_name += parameter
model_name += ')_{ \\textrm{' + ci_method_name.upper() + '}} $ '
return model_name
def get_model_name(model_class):
return get_display_name_from_object_type(model_class).split('Stationary')[0] + 'Stationary'
def plot_massif_name_to_model_name_to_uncertainty_method_to_ordered_dict(d, nb_massif_names, nb_model_names, show=True):
"""
Rows correspond to massif names
Columns correspond to stationary/non stationary model name for a given date
Uncertainty method correpsond to the different plot on the graph
:return:
"""
axes = create_adjusted_axes(nb_massif_names, nb_model_names)
if nb_massif_names == 1:
axes = [axes]
for ax, (massif_name, model_name_to_uncertainty_level) in zip(axes, d.items()):
plot_model_name_to_uncertainty_method_to_ordered_dict(model_name_to_uncertainty_level,
massif_name, ax)
plt.suptitle('50-year return levels of extreme snow loads in France for several confiance interval methods.')
if show:
plt.show()
def plot_dep_to_model_name_dep_to_ordered_return_level_uncertainties(ax, dep_class,
altitudes,
model_name_to_ordered_return_level_uncertainties:
Dict[str, List[
EurocodeLevelUncertaintyFromExtremes]]):
colors = ['red', 'blue', 'green']
def plot_model_name_to_uncertainty_method_to_ordered_dict(d, massif_name, axes):
if len(d) == 1:
axes = [axes]
for ax, (model_name, uncertainty_method_to_ordered_dict) in zip(axes, d.items()):
plot_label_to_ordered_return_level_uncertainties(ax, massif_name, model_name,
uncertainty_method_to_ordered_dict)
def plot_label_to_ordered_return_level_uncertainties(ax, massif_name, model_name,
label_to_ordered_return_level_uncertainties:
Dict[str, List[
EurocodeLevelUncertaintyFromExtremes]]):
""" Generic function that might be used by many other more global functions"""
colors = ['tab:blue', 'tab:orange', 'tab:purple', 'tab:olive']
alpha = 0.2
# Display the EUROCODE return level
dep_object = dep_class()
dep_object.eurocode_region.plot_max_loading(ax, altitudes=altitudes)
eurocode_region = massif_name_to_eurocode_region[massif_name]()
# Display the return level from model class
for color, (model_name, ordered_return_level_uncertaines) in zip(colors,
model_name_to_ordered_return_level_uncertainties.items()):
for j, (color, (label, l)) in enumerate(zip(colors,label_to_ordered_return_level_uncertainties.items())):
altitudes, ordered_return_level_uncertaines = zip(*l)
# Plot eurocode standards only for the first loop
if j == 0:
eurocode_region.plot_max_loading(ax, altitudes=altitudes)
mean = [r.posterior_mean for r in ordered_return_level_uncertaines]
ax.plot(altitudes, mean, '-', color=color, label=model_name)
ci_method_name = str(label).split('.')[1].replace('_', ' ')
ax.plot(altitudes, mean, '-', color=color, label=get_label_name(model_name, ci_method_name))
lower_bound = [r.poster_uncertainty_interval[0] for r in ordered_return_level_uncertaines]
upper_bound = [r.poster_uncertainty_interval[1] for r in ordered_return_level_uncertaines]
ax.fill_between(altitudes, lower_bound, upper_bound, color=color, alpha=alpha)
ax.legend(loc=2)
ax.set_ylim([0.0, 4.0])
ax.set_title(str(dep_object))
ax.set_title(massif_name + ' ' + model_name)
ax.set_ylabel('50-year return level (N $m^-2$)')
ax.set_xlabel('Altitude (m)')
import time
from collections import OrderedDict
from experiment.eurocode_data.eurocode_visualizer import plot_model_name_to_dep_to_ordered_return_level_uncertainties
from experiment.eurocode_data.massif_name_to_departement import DEPARTEMENT_TYPES
from experiment.eurocode_data.eurocode_return_level_uncertainties import ConfidenceIntervalMethodFromExtremes
from experiment.eurocode_data.eurocode_visualizer import \
plot_massif_name_to_model_name_to_uncertainty_method_to_ordered_dict, get_model_name
from experiment.eurocode_data.massif_name_to_departement import DEPARTEMENT_TYPES, MASSIF_NAMES_ALPS
from experiment.eurocode_data.utils import EUROCODE_ALTITUDES, LAST_YEAR_FOR_EUROCODE
from experiment.meteo_france_data.scm_models_data.crocus.crocus import CrocusSwe3Days
from experiment.meteo_france_data.scm_models_data.visualization.hypercube_visualization.altitude_hypercube_visualizer import \
......@@ -20,17 +22,9 @@ mpl.rcParams['text.usetex'] = True
mpl.rcParams['text.latex.preamble'] = [r'\usepackage{amsmath}']
def dep_to_ordered_return_level_uncertainties(model_class, last_year_for_the_data, altitudes):
model_type = get_display_name_from_object_type(model_class).split('Stationary')[0] + 'Stationary'
# model_name += ' 1958-' + str(last_year_for_the_data)
is_non_stationary = model_type == 'NonStationary'
model_symbol = '{\mu_1, \sigma_1}' if is_non_stationary else '0'
parameter = ', 2017' if is_non_stationary else ''
model_name = ' $ \widehat{q_{\\textrm{GEV}}(\\boldsymbol{\\theta_{\mathcal{M}_'
model_name += model_symbol
model_name += '}}'
model_name += parameter
model_name += ')}_{ \\textrm{MMSE}} $ ' + '({})'.format(model_type)
def massif_name_to_ordered_return_level_uncertainties(model_class, last_year_for_the_data, altitudes, massif_names, uncertainty_methods):
# Load model name
model_name = get_model_name(model_class)
# Load altitude visualizer
altitude_visualizer = load_altitude_visualizer(AltitudeHypercubeVisualizer, altitudes=altitudes,
last_starting_year=None, nb_data_reduced_for_speed=False,
......@@ -38,49 +32,58 @@ def dep_to_ordered_return_level_uncertainties(model_class, last_year_for_the_dat
exact_starting_year=1958,
first_starting_year=None,
study_classes=[CrocusSwe3Days],
trend_test_class=None)
trend_test_class=None) # type: AltitudeHypercubeVisualizer
# Loop on the data
assert isinstance(altitude_visualizer.tuple_to_study_visualizer, OrderedDict)
dep_to_ordered_return_level_uncertainty = {dep: [] for dep in DEPARTEMENT_TYPES}
massif_name_to_ordered_eurocode_level_uncertainty = {massif_name: {ci_method: [] for ci_method in uncertainty_methods} for massif_name in massif_names}
for altitude, visualizer in altitude_visualizer.tuple_to_study_visualizer.items():
print('{} processing altitude = {} '.format(model_name, altitude))
dep_to_return_level_uncertainty = visualizer.dep_class_to_eurocode_level_uncertainty(model_class,
last_year_for_the_data)
for dep, return_level_uncertainty in dep_to_return_level_uncertainty.items():
dep_to_ordered_return_level_uncertainty[dep].append(return_level_uncertainty)
for ci_method in uncertainty_methods:
d = visualizer.massif_name_to_altitude_and_eurocode_level_uncertainty(model_class, last_year_for_the_data, massif_names, ci_method)
# Append the altitude one by one
for massif_name, return_level_uncertainty in d.items():
massif_name_to_ordered_eurocode_level_uncertainty[massif_name][ci_method].append(return_level_uncertainty)
return {model_name: massif_name_to_ordered_eurocode_level_uncertainty}
return {model_name: dep_to_ordered_return_level_uncertainty}
def main_drawing():
fast_plot = [True, False][0]
# Select parameters
fast_plot = [True, False][1]
massif_names = MASSIF_NAMES_ALPS[:]
model_class_and_last_year = [
(StationaryTemporalModel, LAST_YEAR_FOR_EUROCODE),
(StationaryTemporalModel, 2017),
(NonStationaryLocationAndScaleTemporalModel, 2017),
][1:]
altitudes = EUROCODE_ALTITUDES[:]
uncertainty_methods = [ConfidenceIntervalMethodFromExtremes.my_bayes, ConfidenceIntervalMethodFromExtremes.bayes]
if fast_plot:
model_class_and_last_year = model_class_and_last_year[:2]
altitudes = altitudes[:2]
model_class_and_last_year = model_class_and_last_year[:1]
altitudes = altitudes[2:4]
massif_names = massif_names[:1]
uncertainty_methods = uncertainty_methods[:1]
model_name_to_dep_to_ordered_return_level = {}
model_name_to_massif_name_to_ordered_return_level = {}
for model_class, last_year_for_the_data in model_class_and_last_year:
start = time.time()
model_name_to_dep_to_ordered_return_level.update(
dep_to_ordered_return_level_uncertainties(model_class, last_year_for_the_data, altitudes))
model_name_to_massif_name_to_ordered_return_level.update(
massif_name_to_ordered_return_level_uncertainties(model_class, last_year_for_the_data, altitudes, massif_names, uncertainty_methods))
duration = time.time() - start
print(model_class, duration)
# Transform the dictionary into the desired format
dep_to_model_name_to_ordered_return_level_uncertainties = {}
for dep in DEPARTEMENT_TYPES:
d2 = {model_name: model_name_to_dep_to_ordered_return_level[model_name][dep] for model_name in
model_name_to_dep_to_ordered_return_level.keys()}
dep_to_model_name_to_ordered_return_level_uncertainties[dep] = d2
massif_name_to_model_name_to_ordered_return_level_uncertainties = {}
for massif_name in massif_names:
d2 = {model_name: model_name_to_massif_name_to_ordered_return_level[model_name][massif_name] for model_name in
model_name_to_massif_name_to_ordered_return_level.keys()}
massif_name_to_model_name_to_ordered_return_level_uncertainties[massif_name] = d2
# Plot graph
plot_model_name_to_dep_to_ordered_return_level_uncertainties(
dep_to_model_name_to_ordered_return_level_uncertainties, altitudes, show=True)
plot_massif_name_to_model_name_to_uncertainty_method_to_ordered_dict(
massif_name_to_model_name_to_ordered_return_level_uncertainties, nb_massif_names=len(massif_names),
nb_model_names=len(model_class_and_last_year), show=True)
if __name__ == '__main__':
......
......@@ -61,6 +61,8 @@ massif_name_to_departement_objects = {m: [d() for d in deps] for m, deps in
DEPARTEMENT_TYPES = [HauteSavoie, Savoie, Isere, Drome, HautesAlpes, AlpesMaritimes, AlpesDeHauteProvence]
MASSIF_NAMES_ALPS = list(massif_name_to_eurocode_region.keys())
dep_class_to_massif_names = {dep: [k for k, v in massif_name_to_departement_types.items() if dep in v]
for dep in DEPARTEMENT_TYPES
}
......
......@@ -2,7 +2,7 @@
# Eurocode quantile correspond to a 50 year return period
EUROCODE_QUANTILE = 0.98
# Altitudes (between low and mid altitudes) < 2000m
EUROCODE_ALTITUDES = [900, 1200, 1500, 1800]
EUROCODE_ALTITUDES = [0, 300, 600, 900, 1200, 1500, 1800]
# Last year taken into account for the Eurocode
# Date of publication was 2014, therefore the winter 2013/2014 could not have been measured
# Therefore, the winter 2012/2013 was the last one. Thus, 2012 is the last year for the Eurocode
......
......@@ -3,7 +3,7 @@ import os.path as op
from collections import OrderedDict
from multiprocessing.pool import Pool
from random import sample, seed
from typing import Dict
from typing import Dict, Tuple
import math
import matplotlib.pyplot as plt
......@@ -355,28 +355,30 @@ class StudyVisualizer(VisualizationParameters):
start_year, stop_year = self.study.start_year_and_stop_year
return list(range(start_year, stop_year))
def massif_name_to_eurocode_level_uncertainty(self, model_class, last_year_for_the_data) -> Dict[str, EurocodeLevelUncertaintyFromExtremes]:
arguments = [[last_year_for_the_data, self.smooth_maxima_x_y(massif_id), model_class] for massif_id, _ in enumerate(self.study.study_massif_names)]
def massif_name_to_altitude_and_eurocode_level_uncertainty(self, model_class, last_year_for_the_data, massif_names, ci_method) -> Dict[str, Tuple[int, EurocodeLevelUncertaintyFromExtremes]]:
arguments = [[last_year_for_the_data, self.smooth_maxima_x_y(massif_id), model_class, ci_method] for massif_id, _ in enumerate(massif_names)]
self.multiprocessing = False
if self.multiprocessing:
with Pool(NB_CORES) as p:
res = p.starmap(compute_eurocode_level_uncertainty, arguments)
else:
res = [compute_eurocode_level_uncertainty(*argument) for argument in arguments]
massif_name_to_eurocode_return_level_uncertainty = OrderedDict(zip(self.study.study_massif_names, res))
res_and_altitude = [(self.study.altitude, r) for r in res]
massif_name_to_eurocode_return_level_uncertainty = OrderedDict(zip(self.study.study_massif_names, res_and_altitude))
return massif_name_to_eurocode_return_level_uncertainty
def dep_class_to_eurocode_level_uncertainty(self, model_class, last_year_for_the_data):
""" Take the max with respect to the posterior mean for each departement """
massif_name_to_eurocode_level_uncertainty = self.massif_name_to_eurocode_level_uncertainty(model_class,
last_year_for_the_data)
dep_class_to_eurocode_level_uncertainty = {}
for dep_class, massif_names in dep_class_to_massif_names.items():
# Filter the couple of interest
couples = [(k, v) for k, v in massif_name_to_eurocode_level_uncertainty.items() if k in massif_names]
assert len(couples) > 0
massif_name, eurocode_level_uncertainty = max(couples, key=lambda c: c[1].posterior_mean)
dep_class_to_eurocode_level_uncertainty[dep_class] = eurocode_level_uncertainty
return dep_class_to_eurocode_level_uncertainty
# def dep_class_to_eurocode_level_uncertainty(self, model_class, last_year_for_the_data):
# """ Take the max with respect to the posterior mean for each departement """
# massif_name_to_eurocode_level_uncertainty = self.massif_name_to_eurocode_level_uncertainty(model_class,
# last_year_for_the_data)
# dep_class_to_eurocode_level_uncertainty = {}
# for dep_class, massif_names in dep_class_to_massif_names.items():
# # Filter the couple of interest
# couples = [(k, v) for k, v in massif_name_to_eurocode_level_uncertainty.items() if k in massif_names]
# assert len(couples) > 0
# massif_name, eurocode_level_uncertainty = max(couples, key=lambda c: c[1].posterior_mean)
# dep_class_to_eurocode_level_uncertainty[dep_class] = eurocode_level_uncertainty
# return dep_class_to_eurocode_level_uncertainty
@cached_property
def massif_name_to_detailed_scores(self):
......
......@@ -21,7 +21,7 @@ def main_non_stationary_model_comparison():
stop_loop = False
for altitude in POSTER_ALTITUDES[:]:
for trend_test_class in [GevLocationTrendTest, GevScaleTrendTest, GevLocationAndScaleTrendTest,
ComparisonAgainstMu, ComparisonAgainstSigma][:3]:
ComparisonAgainstMu, ComparisonAgainstSigma][:]:
vizualiser = get_full_altitude_visualizer(Altitude_Hypercube_Year_Visualizer, altitude=altitude,
exact_starting_year=1958, reduce_strength_array=False,
trend_test_class=trend_test_class,
......
......@@ -9,7 +9,7 @@ from experiment.trend_analysis.univariate_test.utils import load_temporal_coordi
fitted_linear_margin_estimator
from extreme_fit.estimator.margin_estimator.abstract_margin_estimator import LinearMarginEstimator
from extreme_fit.model.margin_model.linear_margin_model.abstract_temporal_linear_margin_model import \
AbstractTemporalLinearMarginModel
AbstractTemporalLinearMarginModel, TemporalMarginFitMethod
from extreme_fit.model.margin_model.linear_margin_model.temporal_linear_margin_models import \
StationaryTemporalModel
from extreme_fit.model.utils import SafeRunException
......@@ -25,9 +25,9 @@ class AbstractGevTrendTest(AbstractUnivariateTest):
def __init__(self, years, maxima, starting_year, unconstrained_model_class,
constrained_model_class=StationaryTemporalModel,
fit_method=AbstractTemporalLinearMarginModel.ISMEV_GEV_FIT_METHOD_STR):
):
super().__init__(years, maxima, starting_year)
self.fit_method = fit_method
self.fit_method = TemporalMarginFitMethod.is_mev_gev_fit
# Load observations, coordinates and datasets
self.coordinates, self.dataset = load_temporal_coordinates_and_dataset(maxima, years)
try:
......
......@@ -4,11 +4,11 @@ import numpy as np
from experiment.trend_analysis.univariate_test.abstract_gev_trend_test import AbstractGevTrendTest
from extreme_fit.model.margin_model.linear_margin_model.abstract_temporal_linear_margin_model import \
AbstractTemporalLinearMarginModel
AbstractTemporalLinearMarginModel, TemporalMarginFitMethod
from root_utils import NB_CORES
def compute_gev_change_point_test_result(smooth_maxima, starting_year, trend_test_class, years, fit_method=AbstractTemporalLinearMarginModel.ISMEV_GEV_FIT_METHOD_STR):
def compute_gev_change_point_test_result(smooth_maxima, starting_year, trend_test_class, years, fit_method=TemporalMarginFitMethod.is_mev_gev_fit):
trend_test = trend_test_class(years, smooth_maxima, starting_year) # type: AbstractGevTrendTest
assert isinstance(trend_test, AbstractGevTrendTest)
return trend_test.test_trend_type, \
......
from enum import Enum
import numpy as np
import pandas as pd
from extreme_fit.distribution.gev.gev_params import GevParams
from extreme_fit.model.margin_model.linear_margin_model.linear_margin_model import LinearMarginModel
from extreme_fit.model.result_from_model_fit.abstract_result_from_model_fit import AbstractResultFromModelFit
from extreme_fit.model.result_from_model_fit.result_from_extremes import ResultFromExtremes
from extreme_fit.model.result_from_model_fit.result_from_extremes import ResultFromExtremes, ResultFromBayesianExtremes
from extreme_fit.model.result_from_model_fit.result_from_ismev import ResultFromIsmev
from extreme_fit.model.utils import r, ro, get_null, get_margin_formula_extremes, get_coord, get_coord_df
from extreme_fit.model.utils import safe_run_r_estimator
from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates
class TemporalMarginFitMethod(Enum):
is_mev_gev_fit = 0
extremes_fevd_bayesian = 1
extremes_fevd_mle = 2
class AbstractTemporalLinearMarginModel(LinearMarginModel):
"""Linearity only with respect to the temporal coordinates"""
ISMEV_GEV_FIT_METHOD_STR = 'isMev.gev.fit'
EXTREMES_FEVD_BAYESIAN_FIT_METHOD_STR = 'extRemes.fevd.Bayesian'
def __init__(self, coordinates: AbstractCoordinates, use_start_value=False, params_start_fit=None,
params_sample=None, starting_point=None, fit_method=ISMEV_GEV_FIT_METHOD_STR):
params_sample=None, starting_point=None, fit_method=TemporalMarginFitMethod.is_mev_gev_fit):
super().__init__(coordinates, use_start_value, params_start_fit, params_sample, starting_point)
assert fit_method in [self.ISMEV_GEV_FIT_METHOD_STR, self.EXTREMES_FEVD_BAYESIAN_FIT_METHOD_STR]
assert isinstance(fit_method, TemporalMarginFitMethod)
self.fit_method = fit_method
def fitmargin_from_maxima_gev(self, data: np.ndarray, df_coordinates_spat: pd.DataFrame,
df_coordinates_temp: pd.DataFrame) -> AbstractResultFromModelFit:
assert data.shape[1] == len(df_coordinates_temp.values)
x = ro.FloatVector(data[0])
if self.fit_method == self.ISMEV_GEV_FIT_METHOD_STR:
if self.fit_method == TemporalMarginFitMethod.is_mev_gev_fit:
return self.ismev_gev_fit(x, df_coordinates_temp)
if self.fit_method == self.EXTREMES_FEVD_BAYESIAN_FIT_METHOD_STR:
if self.fit_method == TemporalMarginFitMethod.extremes_fevd_bayesian:
return self.extremes_fevd_bayesian_fit(x, df_coordinates_temp)
if self.fit_method == TemporalMarginFitMethod.extremes_fevd_mle:
return self.extremes_fevd_mle_fit(x, df_coordinates_temp)
# Gev Fit with isMev package
......@@ -41,12 +52,22 @@ class AbstractTemporalLinearMarginModel(LinearMarginModel):
# Gev fit with extRemes package
def extremes_fevd_mle_fit(self, x, df_coordinates_temp) -> ResultFromExtremes:
r_type_argument_kwargs, y = self.extreme_arguments(df_coordinates_temp)
res = safe_run_r_estimator(function=r('fevd_fixed'),
x=x,
data=y,
method='MLE',
**r_type_argument_kwargs
)
return ResultFromExtremes(res, self.margin_function_start_fit.gev_param_name_to_dims)
def extremes_fevd_bayesian_fit(self, x, df_coordinates_temp) -> ResultFromExtremes:
# Disable the use of log sigma parametrization
r_type_argument_kwargs = {'use.phi': False,
'verbose': False}
r_type_argument_kwargs.update(get_margin_formula_extremes(self.margin_function_start_fit.form_dict))
y = get_coord_df(df_coordinates_temp)
r_type_argument_kwargs, y = self.extreme_arguments(df_coordinates_temp)
# Assert for any non-stationary model that the shape parameter is constant
# (because the prior function considers that the last parameter should be the shape)
assert GevParams.SHAPE not in self.margin_function_start_fit.gev_param_name_to_dims \
or len(self.margin_function_start_fit.gev_param_name_to_dims[GevParams.SHAPE]) == 1
res = safe_run_r_estimator(function=r('fevd_fixed'),
x=x,
data=y,
......@@ -56,7 +77,16 @@ class AbstractTemporalLinearMarginModel(LinearMarginModel):
iter=5000,
**r_type_argument_kwargs
)
return ResultFromExtremes(res, self.margin_function_start_fit.gev_param_name_to_dims)
return ResultFromBayesianExtremes(res, self.margin_function_start_fit.gev_param_name_to_dims)
def extreme_arguments(self, df_coordinates_temp):
# Disable the use of log sigma parametrization
r_type_argument_kwargs = {'use.phi': False,
'verbose': False}
# Load parameters
r_type_argument_kwargs.update(get_margin_formula_extremes(self.margin_function_start_fit.form_dict))
y = get_coord_df(df_coordinates_temp)
return r_type_argument_kwargs, y
# Default arguments for all methods
......
......@@ -11,16 +11,25 @@ from extreme_fit.model.utils import r
class ResultFromExtremes(AbstractResultFromModelFit):
def __init__(self, result_from_fit: robjects.ListVector, gev_param_name_to_dim=None,
burn_in_percentage=0.1) -> None:
def __init__(self, result_from_fit: robjects.ListVector, gev_param_name_to_dim=None) -> None:
super().__init__(result_from_fit)
self.burn_in_percentage = burn_in_percentage
self.gev_param_name_to_dim = gev_param_name_to_dim
def load_dataframe_from_r_matrix(self, k):
r_matrix = self.name_to_value[k]
def load_dataframe_from_r_matrix(self, name):
r_matrix = self.name_to_value[name]
return pd.DataFrame(np.array(r_matrix), columns=r.colnames(r_matrix))
class ResultFromBayesianExtremes(ResultFromExtremes):
def __init__(self, result_from_fit: robjects.ListVector, gev_param_name_to_dim=None,
burn_in_percentage=0.5) -> None:
super().__init__(result_from_fit, gev_param_name_to_dim)
self.burn_in_percentage = burn_in_percentage
def burn_in_nb(self, df_all_samples):
return int(self.burn_in_percentage * len(df_all_samples))
@property
def chain_info(self):
return self.load_dataframe_from_r_matrix('chain.info')
......@@ -32,9 +41,7 @@ class ResultFromExtremes(AbstractResultFromModelFit):
@property
def df_posterior_samples(self) -> pd.DataFrame:
df_all_samples = pd.concat([self.results.iloc[:, :-1], self.chain_info.iloc[:, -2:]], axis=1)
# Remove the burn in period
burn_in_last_index = int(self.burn_in_percentage * len(df_all_samples))
df_posterior_samples = df_all_samples.iloc[burn_in_last_index:, :]
df_posterior_samples = df_all_samples.iloc[self.burn_in_nb(df_all_samples):, :]
return df_posterior_samples
def get_coef_dict_from_posterior_sample(self, s: pd.Series):
......@@ -47,44 +54,3 @@ class ResultFromExtremes(AbstractResultFromModelFit):
""" It is the coef for the margin function corresponding to the mean posterior parameters """
mean_posterior_parameters = self.df_posterior_samples.iloc[:, :-2].mean(axis=0)
return self.get_coef_dict_from_posterior_sample(mean_posterior_parameters)
# @property
# def
# @property
# def margin_coef_dict(self):
# assert self.gev_param_name_to_dim is not None
# # Build the Coeff dict from gev_param_name_to_dim
# coef_dict = {}
# i = 0
# mle_values = self.name_to_value['mle']
# for gev_param_name in GevParams.PARAM_NAMES:
# # Add intercept
# intercept_coef_name = LinearCoef.coef_template_str(gev_param_name, LinearCoef.INTERCEPT_NAME).format(1)
# coef_dict[intercept_coef_name] = mle_values[i]
# i += 1
# # Add a potential linear temporal trend
# if gev_param_name in self.gev_param_name_to_dim:
# temporal_coef_name = LinearCoef.coef_template_str(gev_param_name,
# AbstractCoordinates.COORDINATE_T).format(1)
# coef_dict[temporal_coef_name] = mle_values[i]
# i += 1
# return coef_dict
# @property
# def all_parameters(self):
# return self.margin_coef_dict
#
# @property
# def nllh(self):
# return convertFloatVector_to_float(self.name_to_value['nllh'])
#
# @property
# def deviance(self):
# return - 2 * self.nllh
#
# @property
# def convergence(self) -> str:
# return convertFloatVector_to_float(self.name_to_value['conv']) == 0
import unittest
from collections import OrderedDict
from experiment.eurocode_data.eurocode_return_level_uncertainties import EurocodeLevelUncertaintyFromExtremes
from experiment.eurocode_data.eurocode_visualizer import plot_model_name_to_dep_to_ordered_return_level_uncertainties
from experiment.eurocode_data.massif_name_to_departement import DEPARTEMENT_TYPES
from experiment.eurocode_data.utils import EUROCODE_ALTITUDES
class TestCoordinateSensitivity(unittest.TestCase):
DISPLAY = False
def test_departement_eurocode_plot(self):
# Create an example
example1 = EurocodeLevelUncertaintyFromExtremes(posterior_mean=1.0,
poster_uncertainty_interval=(0.5, 1.25))
example2 = EurocodeLevelUncertaintyFromExtremes(posterior_mean=0.2,
poster_uncertainty_interval=(0.1, 0.35))
example3 = EurocodeLevelUncertaintyFromExtremes(posterior_mean=0.4,
poster_uncertainty_interval=(0.25, 0.6))
altitude_examples = EUROCODE_ALTITUDES[:2]
dep_to_model_name_toreturn_level_uncertainty = {
dep: {"example1": [example1 for _ in altitude_examples],
"example2": [example2 for _ in altitude_examples],
"example3": [example3 for _ in altitude_examples],
} for dep in DEPARTEMENT_TYPES
}
plot_model_name_to_dep_to_ordered_return_level_uncertainties(dep_to_model_name_toreturn_level_uncertainty,
altitudes=altitude_examples,
show=self.DISPLAY)
if __name__ == '__main__':
unittest.main()
......@@ -7,7 +7,7 @@ from experiment.trend_analysis.univariate_test.abstract_gev_trend_test import fi
from extreme_fit.distribution.gev.gev_params import GevParams
from extreme_fit.estimator.margin_estimator.abstract_margin_estimator import LinearMarginEstimator
from extreme_fit.model.margin_model.linear_margin_model.abstract_temporal_linear_margin_model import \
AbstractTemporalLinearMarginModel
AbstractTemporalLinearMarginModel, TemporalMarginFitMethod
from extreme_fit.model.margin_model.linear_margin_model.temporal_linear_margin_models import StationaryTemporalModel, \
NonStationaryLocationTemporalModel, NonStationaryLocationAndScaleTemporalModel
from extreme_fit.model.result_from_model_fit.result_from_extremes import ResultFromExtremes
......@@ -38,14 +38,14 @@ class TestGevTemporalBayesian(unittest.TestCase):
df2 = pd.DataFrame(data=np.array(r['x_gev']), index=df.index)
observations = AbstractSpatioTemporalObservations(df_maxima_gev=df2)
self.dataset = AbstractDataset(observations=observations, coordinates=self.coordinates)
self.fit_method = AbstractTemporalLinearMarginModel.EXTREMES_FEVD_BAYESIAN_FIT_METHOD_STR
self.fit_method = TemporalMarginFitMethod.extremes_fevd_bayesian
def test_gev_temporal_margin_fit_stationary(self):
# Create estimator
estimator_fitted = fitted_linear_margin_estimator(StationaryTemporalModel, self.coordinates, self.dataset,
starting_year=0,
fit_method=AbstractTemporalLinearMarginModel.EXTREMES_FEVD_BAYESIAN_FIT_METHOD_STR)
ref = {'loc': 0.30440183718071845, 'scale': 1.301639258861012, 'shape': 0.303652401064773}
fit_method=self.fit_method)
ref = {'loc': 0.34272436381693616, 'scale': 1.3222588712831973, 'shape': 0.30491484962825105}
for year in range(1, 3):
mle_params_estimated = estimator_fitted.margin_function_from_fit.get_gev_params(np.array([year])).to_dict()
for key in ref.keys():
......@@ -55,7 +55,7 @@ class TestGevTemporalBayesian(unittest.TestCase):
# Create estimator
estimator = fitted_linear_margin_estimator(NonStationaryLocationTemporalModel, self.coordinates, self.dataset,
starting_year=0,
fit_method=AbstractTemporalLinearMarginModel.EXTREMES_FEVD_BAYESIAN_FIT_METHOD_STR)
fit_method=self.fit_method)
mu1_values = estimator.result_from_model_fit.df_posterior_samples.iloc[:, 1]
self.assertTrue((mu1_values != 0).any())
# Checks that parameters returned are indeed different
......@@ -67,7 +67,7 @@ class TestGevTemporalBayesian(unittest.TestCase):
# Create estimator
estimator = fitted_linear_margin_estimator(NonStationaryLocationAndScaleTemporalModel, self.coordinates, self.dataset,
starting_year=0,
fit_method=AbstractTemporalLinearMarginModel.EXTREMES_FEVD_BAYESIAN_FIT_METHOD_STR)
fit_method=self.fit_method)
mu1_values = estimator.result_from_model_fit.df_posterior_samples.iloc[:, 1]
self.assertTrue((mu1_values != 0).any())
# Checks that parameters returned are indeed different
......@@ -75,35 +75,6 @@ class TestGevTemporalBayesian(unittest.TestCase):
mle_params_estimated_year3 = estimator.margin_function_from_fit.get_gev_params(np.array([3])).to_dict()
self.assertNotEqual(mle_params_estimated_year1, mle_params_estimated_year3)
#
# def test_gev_temporal_margin_fit_nonstationary_with_start_point(self):
# # Create estimator
# estimator = self.fit_non_stationary_estimator(starting_point=3)
# self.assertNotEqual(estimator.margin_function_from_fit.mu1_temporal_trend, 0.0)
# # Checks starting point parameter are well passed
# self.assertEqual(3, estimator.margin_function_from_fit.starting_point)
# # Checks that parameters returned are indeed different
# mle_params_estimated_year1 = estimator.margin_function_from_fit.get_gev_params(np.array([1])).to_dict()
# mle_params_estimated_year3 = estimator.margin_function_from_fit.get_gev_params(np.array([3])).to_dict()
# self.assertEqual(mle_params_estimated_year1, mle_params_estimated_year3)
# mle_params_estimated_year5 = estimator.margin_function_from_fit.get_gev_params(np.array([5])).to_dict()
# self.assertNotEqual(mle_params_estimated_year5, mle_params_estimated_year3)
#
# def fit_non_stationary_estimator(self, starting_point):
# margin_model = NonStationaryLocationStationModel(self.coordinates,
# starting_point=starting_point + self.start_year)
# estimator = LinearMarginEstimator(self.dataset, margin_model)
# estimator.fit()
# return estimator
#
# def test_two_different_starting_points(self):
# # Create two different estimators
# estimator1 = self.fit_non_stationary_estimator(starting_point=3)
# estimator2 = self.fit_non_stationary_estimator(starting_point=28)
# mu1_estimator1 = estimator1.margin_function_from_fit.mu1_temporal_trend
# mu1_estimator2 = estimator2.margin_function_from_fit.mu1_temporal_trend
# self.assertNotEqual(mu1_estimator1, mu1_estimator2)
if __name__ == '__main__':
unittest.main()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment