Commit db59c605 authored by Le Roux Erwan's avatar Le Roux Erwan
Browse files

[EXTREME ESTIMATOR] add test confidence interval. Add confidence interval with...

[EXTREME ESTIMATOR] add test confidence interval. Add confidence interval with the ci method in the Bayesian case. add test for the stationary case. put the division by 100 (to get snow load) in eurocode vizualizer
parent 7596a6dd
No related merge requests found
Showing with 356 additions and 32 deletions
+356 -32
from typing import Dict, List
from typing import Dict, List, Tuple
import matplotlib.pyplot as plt
from extreme_fit.model.result_from_model_fit.result_from_extremes.eurocode_return_level_uncertainties import EurocodeConfidenceIntervalFromExtremes
from extreme_fit.model.result_from_model_fit.result_from_extremes.eurocode_return_level_uncertainties import \
EurocodeConfidenceIntervalFromExtremes
from experiment.eurocode_data.massif_name_to_departement import massif_name_to_eurocode_region
from experiment.meteo_france_data.scm_models_data.visualization.utils import create_adjusted_axes
from root_utils import get_display_name_from_object_type
......@@ -23,6 +24,7 @@ def get_label_name(model_name, ci_method_name: str):
def get_model_name(model_class):
return get_display_name_from_object_type(model_class).split('Stationary')[0] + 'Stationary'
def plot_massif_name_to_model_name_to_uncertainty_method_to_ordered_dict(d, nb_massif_names, nb_model_names, show=True):
"""
Rows correspond to massif names
......@@ -62,17 +64,19 @@ def plot_label_to_ordered_return_level_uncertainties(ax, massif_name, model_name
eurocode_region = massif_name_to_eurocode_region[massif_name]()
# Display the return level from model class
for j, (color, (label, l)) in enumerate(zip(colors,label_to_ordered_return_level_uncertainties.items())):
altitudes, ordered_return_level_uncertaines = zip(*l)
for j, (color, (label, l)) in enumerate(zip(colors, label_to_ordered_return_level_uncertainties.items())):
l = list(zip(*l))
altitudes = l[0]
ordered_return_level_uncertaines = l[1] # type: List[EurocodeConfidenceIntervalFromExtremes]
# Plot eurocode standards only for the first loop
if j == 0:
eurocode_region.plot_max_loading(ax, altitudes=altitudes)
mean = [r.posterior_mean for r in ordered_return_level_uncertaines]
conversion_factor = 100 # We divide by 100 to transform the snow water equivalent into snow load
mean = [r.mean_estimate / conversion_factor for r in ordered_return_level_uncertaines]
ci_method_name = str(label).split('.')[1].replace('_', ' ')
ax.plot(altitudes, mean, '-', color=color, label=get_label_name(model_name, ci_method_name))
lower_bound = [r.poster_uncertainty_interval[0] for r in ordered_return_level_uncertaines]
upper_bound = [r.poster_uncertainty_interval[1] for r in ordered_return_level_uncertaines]
lower_bound = [r.confidence_interval[0] / conversion_factor for r in ordered_return_level_uncertaines]
upper_bound = [r.confidence_interval[1] / conversion_factor for r in ordered_return_level_uncertaines]
ax.fill_between(altitudes, lower_bound, upper_bound, color=color, alpha=alpha)
ax.legend(loc=2)
ax.set_ylim([0.0, 4.0])
......
......@@ -60,7 +60,7 @@ def main_drawing():
(NonStationaryLocationAndScaleTemporalModel, 2017),
][1:]
altitudes = EUROCODE_ALTITUDES[:]
uncertainty_methods = [ConfidenceIntervalMethodFromExtremes.my_bayes, ConfidenceIntervalMethodFromExtremes.bayes]
uncertainty_methods = [ConfidenceIntervalMethodFromExtremes.my_bayes, ConfidenceIntervalMethodFromExtremes.ci_bayes]
show = True
if fast_plot:
......
......@@ -6,7 +6,6 @@ library(SpatialExtremes)
# Define a custom Prior function
fevdPriorCustom <- function (theta, q, p, log = FALSE){
# Select the shape parameter (which is always the last parameter either for the stationary or the non-stationary case)
print(attributes(theta))
theta_names = attr(theta, 'names')
shape_idx = match('shape', theta_names)
shape = theta[shape_idx]
......
......@@ -112,3 +112,12 @@ class GevParams(AbstractParams):
indicator_name_to_value[quantile_name] = self.quantile(quantile_value)
assert all([a == b for a, b in zip(self.indicator_names(), indicator_name_to_value.keys())])
return indicator_name_to_value
@classmethod
def greek_letter_from_gev_param_name(cls, gev_param_name):
assert gev_param_name in cls.PARAM_NAMES
return {
cls.LOC: 'mu',
cls.SCALE: 'sigma',
cls.SHAPE: 'zeta',
}[gev_param_name]
\ No newline at end of file
......@@ -3,13 +3,13 @@
# Created by: erwan
# Created on: 04/10/2019
library(extRemes)
library(data.table)
library(stats4)
# library(data.table)
# library(stats4)
library(SpatialExtremes)
source('fevd_fixed.R')
# Sample from a GEV
set.seed(42)
N <- 1000
N <- 100
loc = 0; scale = 1; shape <- 0.1
x_gev <- rgev(N, loc = loc, scale = scale, shape = shape)
......@@ -29,9 +29,9 @@ x_gev <- rgev(N, loc = loc, scale = scale, shape = shape)
print(pbeta(1.0, 1, 1))
print(pbeta(0.5, 1, 1))
print(fevdPriorCustom(2.0, 0.0, 0.0))
# print(pbeta(1.0, 1, 1))
# print(pbeta(0.5, 1, 1))
# print(fevdPriorCustom(2.0, 0.0, 0.0))
......@@ -46,20 +46,55 @@ coord <- matrix(ncol=1, nrow = N)
coord[,1]=seq(1,N,1)
colnames(coord) = c("T")
coord = data.frame(coord, stringsAsFactors = TRUE)
res = fevd_fixed(x_gev, data=coord, location.fun= ~T, method='Bayesian', priorFun="fevdPriorCustom", priorParams=list(q=c(6), p=c(9)), iter=5000, verbose=TRUE, use.phi=FALSE)
print(res)
print('here')
print(res$constant.loc)
print('here2')
print(res$method)
print(res$priorFun)
print(res$priorParams)
m = res$results
print(class(res$chain.info))
print(dim(m))
print(m[1,])
print(m[1,1])
print(res$chain.info[1,])
# res = fevd_fixed(x_gev, data=coord, location.fun= ~T, scale.fun= ~T, method='Bayesian', priorFun="fevdPriorCustom", priorParams=list(q=c(6), p=c(9)), iter=5000, verbose=TRUE, use.phi=FALSE)
res = fevd_fixed(x_gev, data=coord, method='Bayesian', priorFun="fevdPriorCustom", priorParams=list(q=c(6), p=c(9)), iter=5000, verbose=TRUE, use.phi=FALSE)
# Some display for the results
# print(res)
# print('here')
# print(res$constant.loc)
# print('here2')
# print(res$method)
# print(res$priorFun)
# print(res$priorParams)
# m = res$results
# print(class(res$chain.info))
# print(dim(m))
# print(m[1,])
# print(m[1,1])
# print(res$chain.info[1,])
# Confidence interval
res_ci = ci(res, alpha = 0.05, type = c("return.level"),
return.period = 50, FUN = "mean", tscale = FALSE)
print(res_ci)
# print(attributes(res_ci))
# print(dimnames(res_ci))
# print(res_ci[1])
# covariates = c(1.0, 100.0, 1000.0)
# v = make.qcov(res, vals = list(mu1 = covariates, sigma1 = covariates))
# v = make.qcov(res, vals = list(mu1 = c(0.0), sigma1 = c(0.0)))
# res_find = findpars(res, FUN = "mean" , tscale = FALSE, qcov = v,
# burn.in=4998)
# print(res_find)
#
# res_ci = ci(res, alpha = 0.05, type = c("return.level"),
# burn.in=4998,
# return.period = 50, FUN = "mean", tscale = FALSE, qcov=v)
# print(res_ci)
#
# r = qgev(0.98, 0.1427229 , 1.120554, -0.1008511)
# print(r)
# print(attributes(res_ci))
# print(dimnames(res_ci))
# print(res_ci[1])
# print(attr(res_ci), '')
# Title : TODO
# Objective : TODO
# Created by: erwan
# Created on: 04/10/2019
library(extRemes)
library(data.table)
library(stats4)
library(SpatialExtremes)
source('fevd_fixed.R')
# Sample from a GEV
set.seed(42)
N <- 100
loc = 0; scale = 1; shape <- 0.1
x_gev <- rgev(N, loc = loc, scale = scale, shape = shape)
# Add covariate
coord <- matrix(ncol=1, nrow = N)
coord[,1]=seq(1,N,1)
colnames(coord) = c("T")
coord = data.frame(coord, stringsAsFactors = TRUE)
# res = fevd_fixed(x_gev, data=coord, method='MLE', verbose=TRUE, use.phi=FALSE)
res = fevd_fixed(x_gev, data=coord, location.fun= ~T, method='MLE', verbose=TRUE, use.phi=FALSE, time.units = "years", units = "years")
# print(res)
# Some display for the results
# m = res$results
# print(class(res$chain.info))
# print(dim(m))
# print(m)
print(res$results$par)
# print(res$par)
# print(m[1])
# Confidence interval staionary
# method = "proflik"
res_ci = ci(res, alpha = 0.05, type = c("return.level", "parameter"),
return.period = 50, method = method, xrange = NULL, nint = 20, verbose = FALSE,
tscale = FALSE, return.samples = FALSE)
print(res_ci)
# Bug to solve for the non stationary - the returned parameter do not match with the return level
# ci.fevd.mle()
# Confidence interval non staionary
# v = make.qcov(res, vals = list(mu1 = c(0.0)))
# r = return.level(res, return.period = 50, qcov = v)
# print(r)
# param = findpars(res, qcov = v)
# print(param)
# res_ci = ci(res, alpha = 0.05, type = c("return.level", "parameter"),
# return.period = 50, method = "normal", xrange = NULL, nint = 20, verbose = FALSE,
# tscale = FALSE, return.samples = FALSE, qcov=v)
# print(res_ci)
#
#
......@@ -6,6 +6,7 @@ from rpy2 import robjects
class AbstractResultFromModelFit(object):
def __init__(self, result_from_fit: robjects.ListVector) -> None:
self.result_from_fit = result_from_fit
if hasattr(result_from_fit, 'names'):
self.name_to_value = self.get_python_dictionary(result_from_fit)
else:
......
......@@ -10,6 +10,7 @@ from extreme_fit.estimator.utils import load_margin_function
from extreme_fit.model.margin_model.margin_function.linear_margin_function import LinearMarginFunction
from extreme_fit.model.result_from_model_fit.result_from_extremes.result_from_bayesian_extremes import \
ResultFromBayesianExtremes
from extreme_fit.model.result_from_model_fit.result_from_extremes.result_from_mle_extremes import ResultFromMleExtremes
class AbstractExtractEurocodeReturnLevel(object):
......@@ -28,9 +29,35 @@ class AbstractExtractEurocodeReturnLevel(object):
self.eurocode_quantile = EUROCODE_QUANTILE
self.alpha_for_confidence_interval = self.ALPHA_CONFIDENCE_INTERVAL_UNCERTAINTY
@property
def mean_estimate(self) -> np.ndarray:
raise NotImplementedError
@property
def confidence_interval(self):
raise NotImplementedError
class ExtractEurocodeReturnLevelFromCiMethod(AbstractExtractEurocodeReturnLevel):
pass
result_from_fit: ResultFromMleExtremes
def __init__(self, estimator: LinearMarginEstimator, ci_method,
year_of_interest: int = YEAR_OF_INTEREST_FOR_RETURN_LEVEL):
super().__init__(estimator, ci_method, year_of_interest)
@cached_property
def confidence_interval_method(self):
return self.result_from_fit.confidence_interval_method(self.eurocode_quantile,
self.alpha_for_confidence_interval,
self.year_of_interest)
@property
def mean_estimate(self) -> np.ndarray:
return self.confidence_interval_method[0]
@property
def confidence_interval(self):
return self.confidence_interval_method[1]
class ExtractEurocodeReturnLevelFromMyBayesianExtremes(AbstractExtractEurocodeReturnLevel):
......@@ -57,18 +84,18 @@ class ExtractEurocodeReturnLevelFromMyBayesianExtremes(AbstractExtractEurocodeRe
@cached_property
def posterior_eurocode_return_level_samples_for_year_of_interest(self) -> np.ndarray:
"""We divide by 100 to transform the snow water equivalent into snow load"""
return np.array(
[p.quantile(self.eurocode_quantile) for p in self.gev_params_from_fit_for_year_of_interest]) / 100
[p.quantile(self.eurocode_quantile) for p in self.gev_params_from_fit_for_year_of_interest])
@property
def posterior_mean_eurocode_return_level_for_the_year_of_interest(self) -> np.ndarray:
def mean_estimate(self) -> np.ndarray:
# Mean posterior value here
return np.mean(self.posterior_eurocode_return_level_samples_for_year_of_interest)
@property
def posterior_eurocode_return_level_uncertainty_interval_for_the_year_of_interest(self):
def confidence_interval(self):
# Bottom and upper quantile correspond to the quantile
bottom_quantile = self.alpha_for_confidence_interval / 2
bottom_and_upper_quantile = (bottom_quantile, 1 - bottom_quantile)
return [np.quantile(self.posterior_eurocode_return_level_samples_for_year_of_interest, q=q)
for q in bottom_and_upper_quantile]
\ No newline at end of file
for q in bottom_and_upper_quantile]
......@@ -2,6 +2,7 @@ import numpy as np
import pandas as pd
from rpy2 import robjects
from extreme_fit.distribution.gev.gev_params import GevParams
from extreme_fit.model.result_from_model_fit.abstract_result_from_model_fit import \
AbstractResultFromModelFit
from extreme_fit.model.utils import r
......@@ -16,3 +17,27 @@ class AbstractResultFromExtremes(AbstractResultFromModelFit):
def load_dataframe_from_r_matrix(self, name):
r_matrix = self.name_to_value[name]
return pd.DataFrame(np.array(r_matrix), columns=r.colnames(r_matrix))
def confidence_interval_method(self, quantile_level, alpha_interval, temporal_covariate):
return_period = round(1 / (1 - quantile_level))
common_kwargs = {
'return.period': return_period,
'alpha': alpha_interval,
'tscale': False,
'type': r.c("return.level")
}
if self.gev_param_name_to_dim:
d = {GevParams.greek_letter_from_gev_param_name(gev_param_name) + '1': r.c(temporal_covariate) for
gev_param_name in self.gev_param_name_to_dim.keys()}
kwargs = {
"vals": r.list(**d
)
}
qcov = r("make.qcov")(self.result_from_fit,
**kwargs)
common_kwargs['qcov'] = qcov
mean_estimate, confidence_interval = self._confidence_interval_method(common_kwargs)
return mean_estimate, confidence_interval
def _confidence_interval_method(self, common_kwargs):
raise NotImplementedError
......@@ -3,9 +3,9 @@ from enum import Enum
class ConfidenceIntervalMethodFromExtremes(Enum):
# Confidence interval from the ci function
bayes = 0
normal = 1
boot = 2
proflik = 3
ci_bayes = 0
ci_normal = 1
ci_boot = 2
ci_proflik = 3
# Confidence interval from my functions
my_bayes = 4
......@@ -21,9 +21,13 @@ def compute_eurocode_confidence_interval(last_year_for_the_data, smooth_maxima_x
class EurocodeConfidenceIntervalFromExtremes(object):
YEAR_OF_INTEREST = 2017
def __init__(self, posterior_mean, poster_uncertainty_interval):
self.posterior_mean = posterior_mean
self.poster_uncertainty_interval = poster_uncertainty_interval
def __init__(self, mean_estimate, confidence_interval):
self.mean_estimate = mean_estimate
self.confidence_interval = confidence_interval
@property
def triplet(self):
return self.confidence_interval[0], self.mean_estimate, self.confidence_interval[1]
@classmethod
def from_estimator_extremes(cls, estimator_extremes: LinearMarginEstimator,
......@@ -32,16 +36,15 @@ class EurocodeConfidenceIntervalFromExtremes(object):
extractor = ExtractEurocodeReturnLevelFromMyBayesianExtremes(estimator_extremes, ci_method, cls.YEAR_OF_INTEREST)
else:
extractor = ExtractEurocodeReturnLevelFromCiMethod(estimator_extremes, ci_method, cls.YEAR_OF_INTEREST)
return cls(extractor.posterior_mean_eurocode_return_level_for_the_year_of_interest,
extractor.posterior_eurocode_return_level_uncertainty_interval_for_the_year_of_interest)
return cls(extractor.mean_estimate, extractor.confidence_interval)
@classmethod
def from_maxima_years_model_class(cls, maxima, years, model_class,
ci_method=ConfidenceIntervalMethodFromExtremes.bayes):
ci_method=ConfidenceIntervalMethodFromExtremes.ci_bayes):
# Load coordinates and dataset
coordinates, dataset = load_temporal_coordinates_and_dataset(maxima, years)
# Select fit method depending on the ci_method
if ci_method in [ConfidenceIntervalMethodFromExtremes.bayes,
if ci_method in [ConfidenceIntervalMethodFromExtremes.ci_bayes,
ConfidenceIntervalMethodFromExtremes.my_bayes]:
fit_method = TemporalMarginFitMethod.extremes_fevd_bayesian
else:
......
import numpy as np
import pandas as pd
from cached_property import cached_property
from rpy2 import robjects
from extreme_fit.model.result_from_model_fit.result_from_extremes.abstract_result_from_extremes import \
AbstractResultFromExtremes
from extreme_fit.model.result_from_model_fit.utils import get_margin_coef_ordered_dict
from extreme_fit.model.utils import r
class ResultFromBayesianExtremes(AbstractResultFromExtremes):
......@@ -13,8 +16,9 @@ class ResultFromBayesianExtremes(AbstractResultFromExtremes):
super().__init__(result_from_fit, gev_param_name_to_dim)
self.burn_in_percentage = burn_in_percentage
def burn_in_nb(self, df_all_samples):
return int(self.burn_in_percentage * len(df_all_samples))
@property
def burn_in_nb(self):
return int(self.burn_in_percentage * len(self.df_all_samples))
@property
def chain_info(self):
......@@ -24,11 +28,13 @@ class ResultFromBayesianExtremes(AbstractResultFromExtremes):
def results(self):
return self.load_dataframe_from_r_matrix('results')
@cached_property
def df_all_samples(self):
return pd.concat([self.results.iloc[:, :-1], self.chain_info.iloc[:, -2:]], axis=1)
@property
def df_posterior_samples(self) -> pd.DataFrame:
df_all_samples = pd.concat([self.results.iloc[:, :-1], self.chain_info.iloc[:, -2:]], axis=1)
df_posterior_samples = df_all_samples.iloc[self.burn_in_nb(df_all_samples):, :]
return df_posterior_samples
return self.df_all_samples.iloc[self.burn_in_nb:, :]
def get_coef_dict_from_posterior_sample(self, s: pd.Series):
assert len(s) >= 3
......@@ -40,3 +46,16 @@ class ResultFromBayesianExtremes(AbstractResultFromExtremes):
""" It is the coef for the margin function corresponding to the mean posterior parameters """
mean_posterior_parameters = self.df_posterior_samples.iloc[:, :-2].mean(axis=0)
return self.get_coef_dict_from_posterior_sample(mean_posterior_parameters)
def _confidence_interval_method(self, common_kwargs):
bayesian_ci_parameters = {
'burn.in': self.burn_in_nb,
'FUN': "mean",
}
res = r.ci(self.result_from_fit, **bayesian_ci_parameters, **common_kwargs)
d = self.get_python_dictionary(res)
keys = ['Posterior Mean 50-year level', '95% lower CI', '95% upper CI']
mean_estimate, lower, upper = [np.array(d[k])[0] for k in keys]
return mean_estimate, (lower, upper)
import unittest
import numpy as np
import pandas as pd
from experiment.trend_analysis.univariate_test.utils import fitted_linear_margin_estimator
from extreme_fit.model.margin_model.linear_margin_model.abstract_temporal_linear_margin_model import \
TemporalMarginFitMethod
from extreme_fit.model.margin_model.linear_margin_model.temporal_linear_margin_models import StationaryTemporalModel, \
NonStationaryLocationTemporalModel, NonStationaryLocationAndScaleTemporalModel
from extreme_fit.model.result_from_model_fit.result_from_extremes.confidence_interval_method import \
ConfidenceIntervalMethodFromExtremes
from extreme_fit.model.result_from_model_fit.result_from_extremes.eurocode_return_level_uncertainties import \
EurocodeConfidenceIntervalFromExtremes
from extreme_fit.model.utils import r, set_seed_r, set_seed_for_test
from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates
from spatio_temporal_dataset.coordinates.temporal_coordinates.abstract_temporal_coordinates import \
AbstractTemporalCoordinates
from spatio_temporal_dataset.dataset.abstract_dataset import AbstractDataset
from spatio_temporal_dataset.spatio_temporal_observations.abstract_spatio_temporal_observations import \
AbstractSpatioTemporalObservations
class TestConfidenceInterval(unittest.TestCase):
def setUp(self) -> None:
set_seed_for_test()
r("""
N <- 50
loc = 0; scale = 1; shape <- 1
x_gev <- rgev(N, loc = loc, scale = scale, shape = shape)
start_loc = 0; start_scale = 1; start_shape = 1
""")
# Compute the stationary temporal margin with isMev
self.start_year = 0
df = pd.DataFrame({AbstractCoordinates.COORDINATE_T: range(self.start_year, self.start_year + 50)})
self.coordinates = AbstractTemporalCoordinates.from_df(df)
df2 = pd.DataFrame(data=np.array(r['x_gev']), index=df.index)
observations = AbstractSpatioTemporalObservations(df_maxima_gev=df2)
self.dataset = AbstractDataset(observations=observations, coordinates=self.coordinates)
self.model_classes = [StationaryTemporalModel]
def compute_eurocode_ci(self, model_class):
estimator = fitted_linear_margin_estimator(model_class, self.coordinates, self.dataset,
starting_year=0,
fit_method=self.fit_method)
return EurocodeConfidenceIntervalFromExtremes.from_estimator_extremes(estimator, self.ci_method)
def test_my_bayes(self):
self.fit_method = TemporalMarginFitMethod.extremes_fevd_bayesian
self.ci_method = ConfidenceIntervalMethodFromExtremes.my_bayes
self.model_class_to_triplet = {
StationaryTemporalModel: (6.756903450587758, 10.316338515219085, 15.77861914935531),
NonStationaryLocationTemporalModel: (6.047033481540427, 9.708540966532225, 14.74058551926604),
NonStationaryLocationAndScaleTemporalModel: (6.383536224810785, 9.69120774797993, 13.917914357321615),
}
def test_ci_bayes(self):
self.fit_method = TemporalMarginFitMethod.extremes_fevd_bayesian
self.ci_method = ConfidenceIntervalMethodFromExtremes.ci_bayes
self.model_class_to_triplet = {
StationaryTemporalModel: (6.756903450587758, 10.316338515219085, 15.77861914935531),
# NonStationaryLocationTemporalModel: (6.047033481540427, 9.708540966532225, 14.74058551926604),
# NonStationaryLocationAndScaleTemporalModel: (6.383536224810785, 9.69120774797993, 13.917914357321615),
}
def tearDown(self) -> None:
for model_class, expected_triplet in self.model_class_to_triplet.items():
eurocode_ci = self.compute_eurocode_ci(StationaryTemporalModel)
found_triplet = eurocode_ci.triplet
for a, b in zip(expected_triplet, found_triplet):
self.assertAlmostEqual(a, b, msg="{} {}".format(model_class, found_triplet))
if __name__ == '__main__':
unittest.main()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment