Newer
Older
Le Roux Erwan
committed
import datetime
import math
import time
from itertools import chain
from multiprocessing import Pool
import numpy as np
from cached_property import cached_property
from scipy.stats import chi2
from sklearn.utils import resample
Le Roux Erwan
committed
from extreme_fit.distribution.gev.gev_params import GevParams
Le Roux Erwan
committed
from extreme_fit.distribution.gumbel.gumbel_gof import goodness_of_fit_anderson
from extreme_fit.estimator.margin_estimator.utils import fitted_linear_margin_estimator_short
Le Roux Erwan
committed
from extreme_fit.function.param_function.polynomial_coef import PolynomialAllCoef, PolynomialCoef
Le Roux Erwan
committed
from extreme_fit.model.margin_model.linear_margin_model.temporal_linear_margin_models import StationaryTemporalModel
Le Roux Erwan
committed
from extreme_fit.model.margin_model.polynomial_margin_model.gev_altitudinal_models import StationaryAltitudinal
Le Roux Erwan
committed
from extreme_fit.model.margin_model.polynomial_margin_model.gev_altitudinal_models_only_altitude_and_scale import \
AltitudinalOnlyScale, StationaryAltitudinalOnlyScale
Le Roux Erwan
committed
from extreme_fit.model.margin_model.polynomial_margin_model.gumbel_altitudinal_models import \
StationaryGumbelAltitudinal, AbstractGumbelAltitudinalModel
Le Roux Erwan
committed
from extreme_fit.model.margin_model.polynomial_margin_model.models_based_on_pariwise_analysis.gev_with_linear_shape_wrt_altitude import \
AltitudinalShapeLinearTimeStationary
from extreme_fit.model.margin_model.utils import MarginFitMethod
from extreme_fit.model.result_from_model_fit.result_from_extremes.abstract_extract_eurocode_return_level import \
AbstractExtractEurocodeReturnLevel
Le Roux Erwan
committed
from extreme_fit.model.result_from_model_fit.result_from_extremes.confidence_interval_method import \
ConfidenceIntervalMethodFromExtremes
from extreme_fit.model.result_from_model_fit.result_from_extremes.eurocode_return_level_uncertainties import \
EurocodeConfidenceIntervalFromExtremes
Le Roux Erwan
committed
from extreme_trend.one_fold_fit.altitude_group import DefaultAltitudeGroup, altitudes_for_groups
from root_utils import NB_CORES, batch
from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates
Le Roux Erwan
committed
from spatio_temporal_dataset.coordinates.temporal_coordinates.abstract_temporal_covariate_for_fit import \
TimeTemporalCovariate
Le Roux Erwan
committed
from spatio_temporal_dataset.coordinates.temporal_coordinates.temperature_covariate import \
Le Roux Erwan
committed
AnomalyTemperatureWithSplineTemporalCovariate
Le Roux Erwan
committed
from spatio_temporal_dataset.dataset.abstract_dataset import AbstractDataset
from spatio_temporal_dataset.spatio_temporal_observations.annual_maxima_observations import AnnualMaxima
class OneFoldFit(object):
SIGNIFICANCE_LEVEL = 0.05
return_period = 100
Le Roux Erwan
committed
quantile_level = 1 - (1 / return_period)
Le Roux Erwan
committed
def __init__(self, massif_name: str, dataset: AbstractDataset, models_classes,
Le Roux Erwan
committed
first_year, last_year,
fit_method=MarginFitMethod.extremes_fevd_mle,
temporal_covariate_for_fit=None,
Le Roux Erwan
committed
altitude_group=None,
only_models_that_pass_goodness_of_fit_test=True,
confidence_interval_based_on_delta_method=False,
Le Roux Erwan
committed
remove_physically_implausible_models=False,
climate_coordinates_with_effects=None
):
Le Roux Erwan
committed
self.first_year = first_year
self.last_year = last_year
self.years_of_difference = last_year - first_year
Le Roux Erwan
committed
self.remove_physically_implausible_models = remove_physically_implausible_models
self.confidence_interval_based_on_delta_method = confidence_interval_based_on_delta_method
self.only_models_that_pass_goodness_of_fit_test = only_models_that_pass_goodness_of_fit_test
Le Roux Erwan
committed
self.altitude_group = altitude_group
self.massif_name = massif_name
self.dataset = dataset
self.models_classes = models_classes
self.fit_method = fit_method
Le Roux Erwan
committed
self.temporal_covariate_for_fit = temporal_covariate_for_fit
self.climate_coordinates_with_effects = climate_coordinates_with_effects
# Fit Estimators
self.model_class_to_estimator = {}
for model_class in models_classes:
self.model_class_to_estimator[model_class] = self.fitted_linear_margin_estimator(model_class, self.dataset)
Le Roux Erwan
committed
# Compute sorted estimators indirectly
_ = self.has_at_least_one_valid_model
def fitted_linear_margin_estimator(self, model_class, dataset):
return fitted_linear_margin_estimator_short(model_class=model_class,
dataset=dataset,
fit_method=self.fit_method,
temporal_covariate_for_fit=self.temporal_covariate_for_fit,
drop_duplicates=False,
climate_coordinates_with_effects=self.climate_coordinates_with_effects)
Le Roux Erwan
committed
@classmethod
def get_moment_str(cls, order):
if order == 1:
return 'mean'
Le Roux Erwan
committed
elif order == 2:
return 'std'
Le Roux Erwan
committed
elif order is None:
return '{}-year return levels'.format(cls.return_period)
Le Roux Erwan
committed
Le Roux Erwan
committed
def get_moment_for_plots(self, altitudes, order=1, covariate_before=None, covariate_after=None):
return [self.get_moment(altitudes[0], covariate_after, order)]
Le Roux Erwan
committed
def get_moment(self, altitude, temporal_covariate, order=1):
gev_params = self.get_gev_params(altitude, temporal_covariate)
if order == 1:
return gev_params.mean
elif order == 2:
return gev_params.std
Le Roux Erwan
committed
elif order is None:
return gev_params.return_level(return_period=self.return_period)
Le Roux Erwan
committed
elif order in GevParams.PARAM_NAMES:
return gev_params.to_dict()[order]
else:
raise NotImplementedError
def get_gev_params(self, altitude, year):
Le Roux Erwan
committed
coordinate = self.get_coordinate(altitude, year)
gev_params = self.best_margin_function_from_fit.get_params(coordinate, is_transformed=False)
return gev_params
Le Roux Erwan
committed
def moment(self, altitudes, order=1):
Le Roux Erwan
committed
return [self.get_moment(altitude, self.covariate_after, order) for altitude in altitudes]
@property
def change_in_return_level_for_reference_altitude(self) -> float:
return self.changes_of_moment(altitudes=[self.altitude_plot], order=None)[0]
@property
def relative_change_in_return_level_for_reference_altitude(self) -> float:
return self.relative_changes_of_moment(altitudes=[self.altitude_plot], order=None)[0]
def changes_of_moment(self, altitudes, order=1, covariate_before=None, covariate_after=None):
covariate_after, covariate_before = self.set_covariate_before_and_after(covariate_after, covariate_before)
changes = []
for altitude in altitudes:
mean_after = self.get_moment(altitude, covariate_after, order)
mean_before = self.get_moment(altitude, covariate_before, order)
change = mean_after - mean_before
changes.append(change)
return changes
def set_covariate_before_and_after(self, covariate_after, covariate_before):
if covariate_before is None:
covariate_before = self.covariate_before
if covariate_after is None:
covariate_after = self.covariate_after
return covariate_after, covariate_before
Le Roux Erwan
committed
@property
def covariate_before(self):
return self._covariate_before_and_after[0]
@property
def covariate_after(self):
return self._covariate_before_and_after[1]
@property
def _covariate_before_and_after(self):
if self.temporal_covariate_for_fit in [None, TimeTemporalCovariate]:
Le Roux Erwan
committed
return self.first_year, self.last_year
Le Roux Erwan
committed
elif self.temporal_covariate_for_fit is AnomalyTemperatureWithSplineTemporalCovariate:
Le Roux Erwan
committed
# In 2020, we are roughly at 1 degree. Thus it natural to see the augmentation from 1 to 2 degree.
Le Roux Erwan
committed
return 1, 2
Le Roux Erwan
committed
else:
raise NotImplementedError
Le Roux Erwan
committed
@property
def between_covariate_str(self):
d_temperature = {'C': '{C}'}
Le Roux Erwan
committed
s = ' between +${}^o\mathrm{C}$ and +${}^o\mathrm{C}$' if self.temporal_covariate_for_fit is AnomalyTemperatureWithSplineTemporalCovariate \
Le Roux Erwan
committed
else ' between {} and {}'
s = s.format(self.covariate_before, self.covariate_after,
**d_temperature)
Le Roux Erwan
committed
return s
def relative_changes_of_moment(self, altitudes, order=1, covariate_before=None, covariate_after=None):
covariate_after, covariate_before = self.set_covariate_before_and_after(covariate_after, covariate_before)
relative_changes = []
for altitude in altitudes:
mean_after = self.get_moment(altitude, covariate_after, order)
mean_before = self.get_moment(altitude, covariate_before, order)
Le Roux Erwan
committed
relative_change = 100 * (mean_after - mean_before) / np.abs(mean_before)
relative_changes.append(relative_change)
return relative_changes
# Minimizing the AIC and some properties
@cached_property
Le Roux Erwan
committed
def sorted_estimators_with_aic(self):
return self._sorted_estimators_with_method_name(method_name='aic')
def method_name_to_best_estimator(self, method_names):
return {self._sorted_estimators_with_method_name(method_name) for method_name in method_names}
def _sorted_estimators_with_method_name(self, method_name):
estimators = self.estimators_quality_checked
try:
sorted_estimators = sorted([estimator for estimator in estimators],
key=lambda e: e.__getattribute__(method_name))
except AssertionError as e:
print('Error for:\n', self.massif_name, self.altitude_group)
raise e
return sorted_estimators
@cached_property
def estimators_quality_checked(self):
estimators = list(self.model_class_to_estimator.values())
Le Roux Erwan
committed
if self.remove_physically_implausible_models:
Le Roux Erwan
committed
# Remove wrong shape
Le Roux Erwan
committed
estimators = [e for e in estimators if -0.5 < self._compute_shape_for_reference_altitude(e) < 0.5]
Le Roux Erwan
committed
# Remove models with undefined parameters for the coordinate of interest
well_defined_estimators = []
for e in estimators:
Le Roux Erwan
committed
coordinate_values_for_the_fit = e.coordinates_for_nllh
Le Roux Erwan
committed
if isinstance(self.altitude_group, DefaultAltitudeGroup):
coordinate_values_for_the_result = []
else:
coordinate_values_for_the_result = [np.array([self.altitude_group.reference_altitude, c])
for c in self._covariate_before_and_after]
coordinate_values_to_check = list(coordinate_values_for_the_fit) + coordinate_values_for_the_result
has_undefined_parameters = False
for coordinate in coordinate_values_to_check:
gev_params = e.margin_function_from_fit.get_params(coordinate)
if gev_params.has_undefined_parameters:
has_undefined_parameters = True
break
if not has_undefined_parameters:
well_defined_estimators.append(e)
Le Roux Erwan
committed
else:
print('undefined', e)
estimators = well_defined_estimators
Le Roux Erwan
committed
if len(estimators) == 0:
print(self.massif_name, " has only implausible models")
# Apply the goodness of fit
if self.only_models_that_pass_goodness_of_fit_test:
Le Roux Erwan
committed
estimators = [e for e in estimators if self.goodness_of_fit_test(e)]
if not (self.remove_physically_implausible_models or self.only_models_that_pass_goodness_of_fit_test):
Le Roux Erwan
committed
assert len(estimators) == len(self.models_classes)
return estimators
Le Roux Erwan
committed
def get_coordinate(self, altitude, year):
Le Roux Erwan
committed
if isinstance(self.altitude_group, DefaultAltitudeGroup):
Le Roux Erwan
committed
if not isinstance(altitude, list):
coordinate = np.array([year])
else:
coordinate = [year] + altitude
Le Roux Erwan
committed
else:
Le Roux Erwan
committed
coordinate = np.array([altitude, year])
return coordinate
def _compute_shape_for_reference_altitude(self, estimator):
Le Roux Erwan
committed
coordinate = self.get_coordinate(self.altitude_plot, self.covariate_after)
gev_params = estimator.margin_function_from_fit.get_params(coordinate, is_transformed=False)
Le Roux Erwan
committed
shape = gev_params.shape
return shape
Le Roux Erwan
committed
@property
def has_at_least_one_valid_model(self):
Le Roux Erwan
committed
return len(self.sorted_estimators_with_aic) > 0
Le Roux Erwan
committed
Le Roux Erwan
committed
@property
def model_class_to_estimator_with_finite_aic(self):
Le Roux Erwan
committed
return {type(estimator.margin_model): estimator for estimator in self.sorted_estimators_with_aic}
Le Roux Erwan
committed
@property
def best_estimator(self):
if self.has_at_least_one_valid_model:
Le Roux Erwan
committed
best_estimator = self.sorted_estimators_with_aic[0]
return best_estimator
Le Roux Erwan
committed
else:
raise ValueError('This object should not have been called because '
'has_at_least_one_valid_model={}'.format(self.has_at_least_one_valid_model))
Le Roux Erwan
committed
@property
Le Roux Erwan
committed
def best_margin_model(self):
return self.best_estimator.margin_model
@property
def best_margin_function_from_fit(self):
return self.best_estimator.margin_function_from_fit
@property
def best_shape(self):
Le Roux Erwan
committed
return self.get_gev_params(altitude=self.altitude_plot, year=self.last_year).shape
Le Roux Erwan
committed
@property
def altitude_plot(self):
return self.altitude_group.reference_altitude
Le Roux Erwan
committed
def best_coef(self, param_name, dim, degree):
try:
coef = self.best_margin_function_from_fit.param_name_to_coef[param_name] # type: PolynomialAllCoef
Le Roux Erwan
committed
if coef.dim_to_polynomial_coef is None:
return coef.intercept
else:
coef = coef.dim_to_polynomial_coef[dim] # type: PolynomialCoef
coef = coef.idx_to_coef[degree]
Le Roux Erwan
committed
return coef
except (TypeError, KeyError):
return None
@property
def model_names(self):
Le Roux Erwan
committed
return [e.margin_model.name_str for e in self.sorted_estimators_with_aic]
@property
def best_name(self):
name = self.best_estimator.margin_model.name_str
latex_command = 'textbf' if self.is_significant else 'textrm'
Le Roux Erwan
committed
best_name = '$\\' + latex_command + '{' + name + '}$'
if self.is_significant:
best_name = '\\underline{' + best_name + '}'
return best_name
# Significant
@property
def stationary_estimator(self):
Le Roux Erwan
committed
try:
if isinstance(self.best_estimator.margin_model, AbstractGumbelAltitudinalModel):
return self.model_class_to_estimator_with_finite_aic[StationaryGumbelAltitudinal]
elif isinstance(self.best_estimator.margin_model, AltitudinalOnlyScale):
return self.model_class_to_estimator_with_finite_aic[StationaryAltitudinalOnlyScale]
elif isinstance(self.best_estimator.margin_model, AltitudinalShapeLinearTimeStationary):
return self.model_class_to_estimator_with_finite_aic[AltitudinalShapeLinearTimeStationary]
elif isinstance(self.best_estimator.margin_model, AltitudinalShapeLinearTimeStationary):
return self.model_class_to_estimator_with_finite_aic[AltitudinalShapeLinearTimeStationary]
Le Roux Erwan
committed
else:
Le Roux Erwan
committed
if isinstance(self.altitude_group, DefaultAltitudeGroup):
return self.model_class_to_estimator_with_finite_aic[StationaryTemporalModel]
else:
return self.model_class_to_estimator_with_finite_aic[StationaryAltitudinal]
except KeyError:
raise KeyError('A stationary model must be added either in the list of models, or here in the method')
@property
def likelihood_ratio(self):
Le Roux Erwan
committed
return self.stationary_estimator.deviance - self.best_estimator.deviance
@property
def degree_freedom_chi2(self):
return self.best_estimator.nb_params - self.stationary_estimator.nb_params
Le Roux Erwan
committed
@cached_property
def is_significant(self) -> bool:
Le Roux Erwan
committed
if isinstance(self.altitude_group, DefaultAltitudeGroup):
# Likelihood ratio based significance
print("likelihood ratio based significance")
stationary_model_classes = [StationaryAltitudinal, StationaryGumbelAltitudinal,
AltitudinalShapeLinearTimeStationary]
if any([isinstance(self.best_estimator.margin_model, c)
for c in stationary_model_classes]):
return False
else:
return self.likelihood_ratio > chi2.ppf(q=1 - self.SIGNIFICANCE_LEVEL, df=self.degree_freedom_chi2)
# Bootstrap based significance
Le Roux Erwan
committed
return self.cached_results_from_bootstrap[0]
Le Roux Erwan
committed
def sign_of_change(self, function_from_fit):
Le Roux Erwan
committed
for temporal_covariate in self._covariate_before_and_after:
coordinate = np.array([self.altitude_plot, temporal_covariate])
return_level = function_from_fit.get_params(
coordinate=coordinate,
is_transformed=False).return_level(return_period=self.return_period)
return_levels.append(return_level)
return 100 * (return_levels[1] - return_levels[0]) / return_levels[0]
def goodness_of_fit_test(self, estimator):
Le Roux Erwan
committed
quantiles = estimator.sorted_empirical_standard_gumbel_quantiles()
goodness_of_fit_anderson_test = goodness_of_fit_anderson(quantiles, self.SIGNIFICANCE_LEVEL)
return goodness_of_fit_anderson_test
Le Roux Erwan
committed
def standard_gumbel_quantiles(self, n=None):
Le Roux Erwan
committed
standard_gumbel_distribution = GevParams(loc=0, scale=1, shape=0)
if n is None:
n = len(self.dataset.coordinates)
Le Roux Erwan
committed
standard_gumbel_quantiles = [standard_gumbel_distribution.quantile(i / (n + 1)) for i in range(1, n + 1)]
return standard_gumbel_quantiles
def best_confidence_interval(self, altitude, year) -> EurocodeConfidenceIntervalFromExtremes:
Le Roux Erwan
committed
coordinate = self.get_coordinate(altitude, year)
if self.confidence_interval_based_on_delta_method:
EurocodeConfidenceIntervalFromExtremes.quantile_level = self.quantile_level
return EurocodeConfidenceIntervalFromExtremes.from_estimator_extremes(
estimator_extremes=self.best_estimator,
ci_method=ConfidenceIntervalMethodFromExtremes.ci_mle,
coordinate=coordinate)
else:
Le Roux Erwan
committed
key = (altitude, year)
mean_estimate = self.cached_results_from_bootstrap[1][key]
confidence_interval = self.cached_results_from_bootstrap[2][key]
return EurocodeConfidenceIntervalFromExtremes(mean_estimate, confidence_interval)
def get_return_level(self, function_from_fit, coordinate):
return function_from_fit.get_params(coordinate).return_level(self.return_period)
Le Roux Erwan
committed
@cached_property
def best_residuals(self):
Le Roux Erwan
committed
return self.best_estimator.sorted_empirical_standard_gumbel_quantiles()
@cached_property
Le Roux Erwan
committed
def cached_results_from_bootstrap(self):
start = time.time()
bootstrap_fitted_functions = self.bootstrap_fitted_functions_from_fit
end1 = time.time()
duration = str(datetime.timedelta(seconds=end1 - start))
print('Fit duration', duration)
# First result - Compute the significance
sign_of_changes = [self.sign_of_change(f) for f in bootstrap_fitted_functions]
if self.sign_of_change(self.best_margin_function_from_fit) > 0:
Le Roux Erwan
committed
is_significant = np.quantile(sign_of_changes, self.SIGNIFICANCE_LEVEL) > 0
else:
is_significant = np.quantile(sign_of_changes, 1 - self.SIGNIFICANCE_LEVEL) < 0
# Second result - Compute some dictionary for the return level
altitude_and_year_to_return_level_mean_estimate = {}
altitude_and_year_to_return_level_confidence_interval = {}
altitudes = altitudes_for_groups[self.altitude_group.group_id - 1]
years = [1959, 2019]
for year in years:
for altitude in altitudes:
key = (altitude, year)
Le Roux Erwan
committed
coordinate = self.get_coordinate(altitude, year)
mean_estimate = self.get_return_level(self.best_margin_function_from_fit, coordinate)
Le Roux Erwan
committed
bootstrap_return_levels = [self.get_return_level(f, coordinate) for f in
bootstrap_fitted_functions]
confidence_interval = tuple([np.quantile(bootstrap_return_levels, q)
for q in AbstractExtractEurocodeReturnLevel.bottom_and_upper_quantile])
altitude_and_year_to_return_level_mean_estimate[key] = mean_estimate
altitude_and_year_to_return_level_confidence_interval[key] = confidence_interval
return is_significant, altitude_and_year_to_return_level_mean_estimate, altitude_and_year_to_return_level_confidence_interval
@property
def return_level_last_temporal_coordinate(self):
Le Roux Erwan
committed
df_temporal_covariate = self.dataset.coordinates.df_temporal_coordinates_for_fit(
temporal_covariate_for_fit=self.temporal_covariate_for_fit,
drop_duplicates=False)
last_temporal_coordinate = df_temporal_covariate.loc[:, AbstractCoordinates.COORDINATE_T].max()
# todo: améliorer the last temporal coordinate. on recupère la liste des rights_limits, puis on prend la valeur juste au dessus ou égale."""
print('last temporal coordinate', last_temporal_coordinate)
altitude = self.altitude_group.reference_altitude
Le Roux Erwan
committed
coordinate = self.get_coordinate(altitude, last_temporal_coordinate)
return self.get_return_level(self.best_margin_function_from_fit, coordinate)
Le Roux Erwan
committed
@property
def bootstrap_fitted_functions_from_fit(self):
Le Roux Erwan
committed
print('nb of bootstrap for confidence interval=', AbstractExtractEurocodeReturnLevel.NB_BOOTSTRAP)
multiprocess = None
idxs = list(range(AbstractExtractEurocodeReturnLevel.NB_BOOTSTRAP))
if multiprocess is None:
start = time.time()
with Pool(NB_CORES) as p:
batchsize = math.ceil(AbstractExtractEurocodeReturnLevel.NB_BOOTSTRAP / NB_CORES)
list_functions_from_fit = p.map(self.fit_batch_bootstrap_estimator, batch(idxs, batchsize=batchsize))
functions_from_fit = list(chain.from_iterable(list_functions_from_fit))
elif multiprocess:
print('multiprocessing')
start = time.time()
with Pool(NB_CORES) as p:
Le Roux Erwan
committed
functions_from_fit = p.map(self.fit_one_bootstrap_estimator, idxs)
else:
Le Roux Erwan
committed
start = time.time()
functions_from_fit = []
Le Roux Erwan
committed
for idx in idxs:
estimator = self.fit_one_bootstrap_estimator(idx)
functions_from_fit.append(estimator)
Le Roux Erwan
committed
end1 = time.time()
duration = str(datetime.timedelta(seconds=end1 - start))
print('Multiprocessing duration', duration)
return functions_from_fit
Le Roux Erwan
committed
def fit_batch_bootstrap_estimator(self, idxs):
list_function_from_fit = []
for idx in idxs:
list_function_from_fit.append(self.fit_one_bootstrap_estimator(idx))
return list_function_from_fit
def fit_one_bootstrap_estimator(self, idx):
resample_residuals = resample(self.best_residuals)
coordinate_values_to_maxima = self.best_estimator. \
coordinate_values_to_maxima_from_standard_gumbel_quantiles(standard_gumbel_quantiles=resample_residuals)
coordinates = self.dataset.coordinates
observations = AnnualMaxima.from_coordinates(coordinates, coordinate_values_to_maxima)
dataset = AbstractDataset(observations=observations, coordinates=coordinates)
model_class = type(self.best_margin_model)
Le Roux Erwan
committed
function_from_fit = self.fitted_linear_margin_estimator(model_class, dataset).margin_function_from_fit
Le Roux Erwan
committed
return function_from_fit