Commit d9b904b9 authored by Le Roux Erwan's avatar Le Roux Erwan
Browse files

[contrasting] add param_name_to_list_for_result. add polynomial models.

parent 9dd7dc4c
No related merge requests found
Showing with 310 additions and 33 deletions
+310 -33
......@@ -40,11 +40,11 @@ class GevParams(AbstractExtremeParams):
def return_level(self, return_period):
return self.quantile(1 - 1 / return_period)
def density(self, x):
def density(self, x, log_scale=False):
if self.has_undefined_parameters:
return np.nan
else:
res = r.dgev(x, self.location, self.scale, self.shape)
res = r.dgev(x, self.location, self.scale, self.shape, log_scale)
if isinstance(x, float):
return res[0]
else:
......
......@@ -23,7 +23,7 @@ colnames(coord) = c("T")
coord = data.frame(coord, stringsAsFactors = TRUE)
# res = fevd_fixed(x_gev, data=coord, method='MLE', verbose=TRUE, use.phi=FALSE)
# res = fevd_fixed(x_gev, data=coord, location.fun= ~T, scale.fun= ~T, method='MLE', type="GEV", verbose=FALSE, use.phi=FALSE)
res = fevd_fixed(x_gev, data=coord, location.fun= ~T, method='MLE', type="GEV", verbose=FALSE, use.phi=FALSE)
res = fevd_fixed(x_gev, data=coord, location.fun= ~poly(T, 2), method='MLE', type="GEV", verbose=FALSE, use.phi=FALSE)
print(res)
# Some display for the results
......
......@@ -5,12 +5,12 @@ from extreme_fit.model.margin_model.linear_margin_model.linear_margin_model impo
from extreme_fit.function.margin_function.linear_margin_function import LinearMarginFunction
def load_margin_function(estimator: AbstractEstimator, margin_model: LinearMarginModel,
margin_function_class=LinearMarginFunction, coef_dict=None):
def load_margin_function(estimator: AbstractEstimator, margin_model: LinearMarginModel, coef_dict=None):
if coef_dict is None:
coef_dict = estimator.result_from_model_fit.margin_coef_ordered_dict
margin_function_class = type(margin_model.margin_function)
return margin_function_class.from_coef_dict(coordinates=estimator.dataset.coordinates,
param_name_to_dims=margin_model.margin_function.param_name_to_dims,
param_name_to_dims=margin_model.param_name_to_list_for_result,
coef_dict=coef_dict,
starting_point=margin_model.starting_point)
......
from typing import Dict, List, Union, Tuple
import numpy as np
from extreme_fit.distribution.gev.gev_params import GevParams
from extreme_fit.function.margin_function.linear_margin_function import LinearMarginFunction
from extreme_fit.function.param_function.abstract_coef import AbstractCoef
from extreme_fit.function.param_function.param_function import LinearParamFunction, PolynomialParamFunction
from extreme_fit.function.param_function.polynomial_coef import PolynomialAllCoef
from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates
class PolynomialMarginFunction(LinearMarginFunction):
def __init__(self, coordinates: AbstractCoordinates, param_name_to_dim_and_degree: Dict[str, List[Tuple[int, int]]],
param_name_to_coef: Dict[str, PolynomialAllCoef], starting_point: Union[None, int] = None,
params_class: type = GevParams):
param_name_to_dims = {}
for param_name in param_name_to_dim_and_degree.keys():
dims = [c[0] for c in param_name_to_dim_and_degree[param_name]]
param_name_to_dims[param_name] = dims
self.param_name_to_dim_and_degree = param_name_to_dim_and_degree
super().__init__(coordinates, param_name_to_dims, param_name_to_coef, starting_point, params_class)
COEF_CLASS = PolynomialAllCoef
def load_specific_param_function(self, param_name):
return PolynomialParamFunction(dim_and_degree=self.param_name_to_dim_and_degree[param_name],
coef=self.param_name_to_coef[param_name])
def get_params(self, coordinate: np.ndarray, is_transformed: bool = True) -> GevParams:
return super().get_params(coordinate, is_transformed)
@classmethod
def from_coef_dict(cls, coordinates: AbstractCoordinates, param_name_to_dims: Dict[str, List[Tuple[int, int]]],
coef_dict: Dict[str, float], starting_point: Union[None, int] = None):
param_name_to_dim_and_degree = param_name_to_dims
assert cls.COEF_CLASS is not None, 'a COEF_CLASS class attributes needs to be defined'
param_name_to_coef = {}
for param_name in GevParams.PARAM_NAMES:
dims = param_name_to_dim_and_degree.get(param_name, [])
coef = cls.COEF_CLASS.from_coef_dict(coef_dict=coef_dict, param_name=param_name,
dims=dims,
coordinates=coordinates)
param_name_to_coef[param_name] = coef
return cls(coordinates, param_name_to_dim_and_degree, param_name_to_coef, starting_point)
......@@ -2,6 +2,7 @@ from typing import List
import numpy as np
from extreme_fit.function.param_function.linear_coef import LinearCoef
from extreme_fit.function.param_function.one_axis_param_function import LinearOneAxisParamFunction
from extreme_fit.function.param_function.polynomial_coef import PolynomialAllCoef, PolynomialCoef
from extreme_fit.function.param_function.spline_coef import SplineCoef
......@@ -50,6 +51,20 @@ class LinearParamFunction(AbstractParamFunction):
return self.dim_to_linear_one_axis_param_function[dim].coef
class PolynomialParamFunction(AbstractParamFunction):
def __init__(self, dim_and_degree, coef: PolynomialAllCoef):
self.coef = coef
self.dim_and_degree = dim_and_degree
def get_param_value(self, coordinate: np.ndarray) -> float:
gev_param_value = 0
for dim, max_degree in self.dim_and_degree:
for degree in range(max_degree+1):
polynomial_coef = self.coef.dim_to_polynomial_coef[dim] # type: PolynomialCoef
polynomial_coef_value = polynomial_coef.idx_to_coef[degree]
gev_param_value += polynomial_coef_value * np.power(coordinate[dim], degree)
return gev_param_value
class SplineParamFunction(AbstractParamFunction):
......
from typing import Dict, List
from extreme_fit.function.param_function.abstract_coef import AbstractCoef
from extreme_fit.function.param_function.linear_coef import LinearCoef
from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates
class PolynomialCoef(AbstractCoef):
"""
Object that maps each degree to its corresponding coefficient.
degree = 1 correspond to the coefficient of the first order polynomial
degree = 2 correspond to the the coefficient of the first order polynomial
degree = 3 correspond to the the coefficient of the first order polynomial
"""
def __init__(self, param_name: str, default_value: float = 1.0, degree_to_coef=None):
super().__init__(param_name, default_value, idx_to_coef=degree_to_coef)
self.max_degree = max(self.idx_to_coef.keys()) if self.idx_to_coef is not None else None
def compute_default_value(self, idx):
return self.default_value / idx
class PolynomialAllCoef(LinearCoef):
def __init__(self, param_name, dim_to_polynomial_coef: Dict[int, PolynomialCoef], intercept=None):
super().__init__(param_name, 1.0, None)
self.dim_to_polynomial_coef = dim_to_polynomial_coef
self._intercept = intercept
@property
def intercept(self) -> float:
if self._intercept is not None:
return self._intercept
else:
return super().intercept
@classmethod
def from_coef_dict(cls, coef_dict: Dict[str, float], param_name: str, dims: List[int],
coordinates: AbstractCoordinates):
degree0 = coef_dict[cls.coef_template_str(param_name, coefficient_name=cls.INTERCEPT_NAME).format(1)]
list_dim_and_max_degree = dims
j = 2
if len(list_dim_and_max_degree) == 0:
dim_to_polynomial_coef = None
intercept = degree0
else:
intercept = None
dim_to_polynomial_coef = {}
for dim, max_degree in list_dim_and_max_degree:
coefficient_name = coordinates.coordinates_names[dim]
if coefficient_name == AbstractCoordinates.COORDINATE_T:
j = 1
degree_to_coef = {0: degree0}
for degree in range(1, max_degree + 1):
coef_value = coef_dict[cls.coef_template_str(param_name, coefficient_name).format(j)]
degree_to_coef[degree] = coef_value
j += 1
dim_to_polynomial_coef[dim] = PolynomialCoef(param_name=param_name, degree_to_coef=degree_to_coef)
return cls(param_name=param_name, dim_to_polynomial_coef=dim_to_polynomial_coef, intercept=intercept)
def form_dict(self, coordinates_names: List[str]) -> Dict[str, str]:
if len(coordinates_names) >= 2:
raise NotImplementedError(
'Check how do we sum two polynomails without having two times an intercept parameter')
formula_list = []
if len(coordinates_names) == 0:
formula_str = '1'
else:
for dim, name in enumerate(coordinates_names):
polynomial_coef = self.dim_to_polynomial_coef[dim]
formula_list.append('poly({}, {}, raw = TRUE)'.format(name, polynomial_coef.max_degree))
formula_str = ' '.join(formula_list)
return {self.param_name + '.form': self.param_name + ' ~ ' + formula_str}
from typing import Dict
from typing import Dict, List
from extreme_fit.function.param_function.abstract_coef import AbstractCoef
class PolynomialCoef(AbstractCoef):
"""
Object that maps each degree to its corresponding coefficient.
degree = 1 correspond to the coefficient of the first order polynomial
degree = 2 correspond to the the coefficient of the first order polynomial
degree = 3 correspond to the the coefficient of the first order polynomial
"""
def __init__(self, param_name: str, default_value: float = 1.0, degree_to_coef=None):
super().__init__(param_name, default_value, idx_to_coef=degree_to_coef)
def compute_default_value(self, idx):
return self.default_value / idx
from extreme_fit.function.param_function.linear_coef import LinearCoef
from extreme_fit.function.param_function.polynomial_coef import PolynomialCoef
class KnotCoef(AbstractCoef):
......
......@@ -59,7 +59,7 @@ class AbstractTemporalLinearMarginModel(LinearMarginModel):
res = safe_run_r_estimator(function=r('gev.fit'),
xdat=x, y=y, mul=self.mul,
sigl=self.sigl, shl=self.shl)
return ResultFromIsmev(res, self.margin_function.param_name_to_dims)
return ResultFromIsmev(res, self.param_name_to_list_for_result)
# Gev fit with extRemes package
......@@ -85,7 +85,7 @@ class AbstractTemporalLinearMarginModel(LinearMarginModel):
method=method,
**r_type_argument_kwargs
)
return ResultFromMleExtremes(res, self.margin_function.param_name_to_dims,
return ResultFromMleExtremes(res, self.param_name_to_list_for_result,
type_for_mle=self.type_for_mle)
def extremes_fevd_bayesian_fit(self, x, df_coordinates_temp) -> AbstractResultFromExtremes:
......@@ -105,7 +105,7 @@ class AbstractTemporalLinearMarginModel(LinearMarginModel):
iter=self.nb_iterations_for_bayesian_fit,
**r_type_argument_kwargs
)
return ResultFromBayesianExtremes(res, self.margin_function.param_name_to_dims)
return ResultFromBayesianExtremes(res, self.param_name_to_list_for_result)
def extreme_arguments(self, df_coordinates_temp):
# Disable the use of log sigma parametrization
......
......@@ -35,6 +35,10 @@ class ParametricMarginModel(AbstractMarginModel, ABC):
assert isinstance(margin_function, ParametricMarginFunction)
return margin_function
@property
def param_name_to_list_for_result(self):
return self.margin_function.param_name_to_dims
def fitmargin_from_maxima_gev(self, data: np.ndarray, df_coordinates_spat: pd.DataFrame,
df_coordinates_temp: pd.DataFrame) -> ResultFromSpatialExtreme:
assert data.shape[1] == len(df_coordinates_spat)
......
from cached_property import cached_property
from extreme_fit.distribution.gev.gev_params import GevParams
from extreme_fit.function.margin_function.parametric_margin_function import ParametricMarginFunction
from extreme_fit.function.margin_function.polynomial_margin_function import PolynomialMarginFunction
from extreme_fit.function.param_function.polynomial_coef import PolynomialAllCoef, PolynomialCoef
from extreme_fit.model.margin_model.linear_margin_model.abstract_temporal_linear_margin_model import \
AbstractTemporalLinearMarginModel
from extreme_fit.model.margin_model.linear_margin_model.temporal_linear_margin_models import GumbelTemporalModel
from extreme_fit.model.margin_model.utils import MarginFitMethod
from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates
class PolynomialMarginModel(AbstractTemporalLinearMarginModel):
def __init__(self, coordinates: AbstractCoordinates, params_user=None, starting_point=None,
fit_method=MarginFitMethod.extremes_fevd_mle, nb_iterations_for_bayesian_fit=5000,
params_initial_fit_bayesian=None, type_for_MLE="GEV", params_class=GevParams, max_degree=2):
super().__init__(coordinates, params_user, starting_point, fit_method, nb_iterations_for_bayesian_fit,
params_initial_fit_bayesian, type_for_MLE, params_class)
self.max_degree = max_degree
# @property
# def nb_params(self):
# self.margin_function.param_name_to_coef
# return
@cached_property
def margin_function(self) -> PolynomialMarginFunction:
return super().margin_function
def load_margin_function(self, param_name_to_list_dim_and_degree=None):
param_name_to_polynomial_all_coef = self.param_name_to_polynomial_all_coef(
param_name_and_dim_and_degree_to_coef=self.params_sample)
return PolynomialMarginFunction(coordinates=self.coordinates,
param_name_to_coef=param_name_to_polynomial_all_coef,
param_name_to_dim_and_degree=param_name_to_list_dim_and_degree,
starting_point=self.starting_point,
params_class=self.params_class)
@property
def default_params(self) -> dict:
default_slope = 0.01
param_name_and_dim_and_degree_to_coef = {}
for param_name in self.params_class.PARAM_NAMES:
for dim in self.coordinates.coordinates_dims:
for degree in range(self.max_degree + 1):
param_name_and_dim_and_degree_to_coef[(param_name, dim, degree)] = default_slope
return param_name_and_dim_and_degree_to_coef
def param_name_to_polynomial_all_coef(self, param_name_and_dim_and_degree_to_coef):
param_name_to_polynomial_all_coef = {}
param_names = list(set([e[0] for e in param_name_and_dim_and_degree_to_coef.keys()]))
for param_name in param_names:
dim_to_polynomial_coef = {}
for dim in self.coordinates.coordinates_dims:
degree_to_coef = {}
for (param_name_loop, dim_loop, degree), coef in param_name_and_dim_and_degree_to_coef.items():
if param_name == param_name_loop and dim == dim_loop:
degree_to_coef[degree] = coef
dim_to_polynomial_coef[dim] = PolynomialCoef(param_name, degree_to_coef=degree_to_coef)
polynomial_all_coef = PolynomialAllCoef(param_name=param_name,
dim_to_polynomial_coef=dim_to_polynomial_coef)
param_name_to_polynomial_all_coef[param_name] = polynomial_all_coef
return param_name_to_polynomial_all_coef
@property
def param_name_to_list_for_result(self):
return self.margin_function.param_name_to_dim_and_degree
class NonStationaryQuadraticLocationModel(PolynomialMarginModel):
def load_margin_function(self, param_name_to_dims=None):
return super().load_margin_function({GevParams.LOC: [(self.coordinates.idx_temporal_coordinates, 2)]})
class NonStationaryLocationGumbelModel(GumbelTemporalModel, NonStationaryQuadraticLocationModel):
pass
......@@ -11,8 +11,8 @@ def convertFloatVector_to_float(f):
return np.array(f)[0]
def get_margin_coef_ordered_dict(param_name_to_dim, mle_values, type_for_mle="GEV"):
assert param_name_to_dim is not None
def get_margin_coef_ordered_dict(param_name_to_dims, mle_values, type_for_mle="GEV"):
assert param_name_to_dims is not None
# Build the Coeff dict from param_name_to_dim
coef_dict = OrderedDict()
i = 0
......@@ -26,9 +26,15 @@ def get_margin_coef_ordered_dict(param_name_to_dim, mle_values, type_for_mle="GE
coef_dict[intercept_coef_name] = coef_value
i += 1
# Add a potential linear temporal trend
if param_name in param_name_to_dim:
temporal_coef_name = LinearCoef.coef_template_str(param_name,
AbstractCoordinates.COORDINATE_T).format(1)
coef_dict[temporal_coef_name] = mle_values[i]
i += 1
if param_name in param_name_to_dims:
dims = param_name_to_dims[param_name]
if isinstance(dims[0], int):
nb_parameters = 1
else:
nb_parameters = dims[0][1]
for j in range(nb_parameters):
temporal_coef_name = LinearCoef.coef_template_str(param_name,
AbstractCoordinates.COORDINATE_T).format(1 + j)
coef_dict[temporal_coef_name] = mle_values[i]
i += 1
return coef_dict
......@@ -92,7 +92,7 @@ def plot_mean(altitude_to_visualizer: Dict[int, StudyVisualizerForMeanValues], d
for i, (a, t) in enumerate(zip(altitudes_massif, trend_tests))
if not t.unconstrained_model_is_stationary]
altitudes_values, values = zip(*res)
moment = 'change in {} years for significant models'.format(nb_years)
moment = 'Change in the last {} years \nfor non-stationary models'.format(nb_years)
else:
moment = 'mean'
values = [t.unconstrained_estimator_gev_params_last_year.mean for t in trend_tests]
......
import unittest
import numpy as np
import pandas as pd
from extreme_fit.distribution.gev.gev_params import GevParams
from extreme_fit.model.margin_model.polynomial_margin_model import NonStationaryQuadraticLocationModel
from extreme_trend.abstract_gev_trend_test import fitted_linear_margin_estimator
from extreme_fit.model.margin_model.utils import \
MarginFitMethod
from extreme_fit.model.margin_model.linear_margin_model.temporal_linear_margin_models import StationaryTemporalModel, \
NonStationaryLocationTemporalModel, NonStationaryLocationAndScaleTemporalModel
from extreme_fit.model.utils import r, set_seed_r
from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates
from spatio_temporal_dataset.coordinates.temporal_coordinates.abstract_temporal_coordinates import \
AbstractTemporalCoordinates
from spatio_temporal_dataset.dataset.abstract_dataset import AbstractDataset
from spatio_temporal_dataset.spatio_temporal_observations.abstract_spatio_temporal_observations import \
AbstractSpatioTemporalObservations
class TestGevTemporalQuadraticExtremesMle(unittest.TestCase):
def setUp(self) -> None:
set_seed_r()
r("""
N <- 50
loc = 0; scale = 1; shape <- 1
x_gev <- rgev(N, loc = loc, scale = scale, shape = shape)
start_loc = 0; start_scale = 1; start_shape = 1
""")
# Compute the stationary temporal margin with isMev
self.start_year = 0
df = pd.DataFrame({AbstractCoordinates.COORDINATE_T: range(self.start_year, self.start_year + 50)})
self.coordinates = AbstractTemporalCoordinates.from_df(df)
df2 = pd.DataFrame(data=np.array(r['x_gev']), index=df.index)
observations = AbstractSpatioTemporalObservations(df_maxima_gev=df2)
self.dataset = AbstractDataset(observations=observations, coordinates=self.coordinates)
self.fit_method = MarginFitMethod.extremes_fevd_mle
def test_gev_temporal_margin_fit_non_stationary_quadratic_location(self):
# Create estimator
estimator = fitted_linear_margin_estimator(NonStationaryQuadraticLocationModel,
self.coordinates, self.dataset,
starting_year=0,
fit_method=self.fit_method)
# Checks that parameters returned are indeed different
mle_params_estimated_year1 = estimator.function_from_fit.get_params(np.array([1])).to_dict()
mle_params_estimated_year3 = estimator.function_from_fit.get_params(np.array([21])).to_dict()
mle_params_estimated_year5 = estimator.function_from_fit.get_params(np.array([41])).to_dict()
self.assertNotEqual(mle_params_estimated_year1, mle_params_estimated_year3)
self.assertNotEqual(mle_params_estimated_year3, mle_params_estimated_year5)
# Assert the relationship for the location is indeed quadratic
location_gev_params = GevParams.LOC
diff1 = mle_params_estimated_year1[location_gev_params] - mle_params_estimated_year3[location_gev_params]
diff2 = mle_params_estimated_year3[location_gev_params] - mle_params_estimated_year5[location_gev_params]
self.assertNotAlmostEqual(diff1, diff2)
# Assert that indicators are correctly computed
self.assertAlmostEqual(estimator.result_from_model_fit.nllh, estimator.nllh())
# self.assertAlmostEqual(estimator.result_from_model_fit.aic, estimator.aic())
# self.assertAlmostEqual(estimator.result_from_model_fit.bic, estimator.bic())
if __name__ == '__main__':
unittest.main()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment