diff --git a/extreme_fit/distribution/abstract_params.py b/extreme_fit/distribution/abstract_params.py index 56852523d979f5fe58d134ff0a037a6785f381b9..def48e046b304a171f3d4fe3706125239a7eb4c5 100644 --- a/extreme_fit/distribution/abstract_params.py +++ b/extreme_fit/distribution/abstract_params.py @@ -39,6 +39,7 @@ class AbstractParams(object): raise NotImplementedError def to_dict(self) -> dict: + assert isinstance(self.param_values, List), self.param_values assert len(self.PARAM_NAMES) == len(self.param_values) return dict(zip(self.PARAM_NAMES, self.param_values)) diff --git a/extreme_fit/distribution/gev/evgam_example.R b/extreme_fit/distribution/gev/evgam_example.R index 0aadfd8848708efd87b3e347fcf2d7fcfd70acc2..5c9b143b0066034c0117cd9ee39ff07bc6975a30 100644 --- a/extreme_fit/distribution/gev/evgam_example.R +++ b/extreme_fit/distribution/gev/evgam_example.R @@ -10,8 +10,19 @@ COprcp_gev <- aggregate(prcp ~ year + meta_row, COprcp, max) COprcp_gev <- cbind(COprcp_gev, COprcp_meta[COprcp_gev$meta_row,]) # print(COprcp_gev) print('before call') -fmla_gev2 <- list(prcp ~ s(elev, bs="cr"), ~ s(lon, fx=FALSE, k=20), ~ 1) +# fmla_gev2 <- list(prcp ~ s(elev, k=3, m=1), ~ 1, ~ 1) +fmla_gev2 <- list(prcp ~ s(elev, bs="bs", k=4, m=2), ~ 1, ~ 1) m_gev2 <- evgam_fixed(fmla_gev2, data=COprcp_gev, family="gev") +print('print results') +print(m_gev2) +# print(m_gev2$coefficients) +location <- m_gev2$location +print(location) +# print(location$coefficients) +# smooth <- m_gev2$location$smooth +# print(smooth) +# print(attr(smooth, "qrc")) +# print(m_gev2.knots) # m_gev2 <- evgam(fmla_gev2, data=COprcp_gev, family="gev") # summary(m_gev2) # print(attributes(m_gev2)) diff --git a/extreme_fit/function/margin_function/spline_margin_function.py b/extreme_fit/function/margin_function/spline_margin_function.py index f3513046dcb149c6ab847e26f14deee5135a00cd..5ac8a21ea7e55b0792a05fdc1f1ed6099d64eca8 100644 --- a/extreme_fit/function/margin_function/spline_margin_function.py +++ b/extreme_fit/function/margin_function/spline_margin_function.py @@ -1,65 +1,58 @@ -from typing import Dict, List +from typing import Dict, List, Union, Tuple import numpy as np -from extreme_fit.function.margin_function.parametric_margin_function import \ - ParametricMarginFunction +from extreme_fit.distribution.gev.gev_params import GevParams +from extreme_fit.function.margin_function.linear_margin_function import LinearMarginFunction +from extreme_fit.function.margin_function.polynomial_margin_function import PolynomialMarginFunction from extreme_fit.function.param_function.abstract_coef import AbstractCoef -from extreme_fit.function.param_function.param_function import AbstractParamFunction, \ +from extreme_fit.function.param_function.param_function import LinearParamFunction, PolynomialParamFunction, \ SplineParamFunction -from extreme_fit.function.param_function.spline_coef import SplineCoef +from extreme_fit.function.param_function.polynomial_coef import PolynomialAllCoef +from extreme_fit.function.param_function.spline_coef import SplineAllCoef, SplineCoef from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates -class SplineMarginFunction(ParametricMarginFunction): - """ - -param_name_to_dims maps each GEV parameters to its correspond knot dimensions. - For instance, dims = [1,2] means the knot will be realized with 2D knots - dims = [1] means the knot will lie only on the first axis +class SplineMarginFunction(LinearMarginFunction): - """ + def __init__(self, coordinates: AbstractCoordinates, + param_name_to_dim_and_max_degree: Dict[str, List[Tuple[int, int]]], + param_name_to_coef: Dict[str, SplineAllCoef], starting_point: Union[None, int] = None, + params_class: type = GevParams, log_scale=None): + param_name_to_dims = {} + for param_name in param_name_to_dim_and_max_degree.keys(): + dims = [c[0] for c in param_name_to_dim_and_max_degree[param_name]] + param_name_to_dims[param_name] = dims + self.param_name_to_dim_and_max_degree = param_name_to_dim_and_max_degree + super().__init__(coordinates, param_name_to_dims, param_name_to_coef, starting_point, params_class, log_scale) - COEF_CLASS = SplineCoef + COEF_CLASS = SplineAllCoef - def __init__(self, coordinates: AbstractCoordinates, param_name_to_dims: Dict[str, List[int]], - param_name_to_coef: Dict[str, AbstractCoef], - param_name_to_nb_knots: Dict[str, int], - degree=3): - self.param_name_to_coef = None # type: Dict[str, SplineCoef] - # Attributes specific for SplineMarginFunction - self.param_name_to_nb_knots = param_name_to_nb_knots - assert degree % 2 == 1 - self.degree = degree - super().__init__(coordinates, param_name_to_dims, param_name_to_coef) - - - def compute_knots(self, dims, nb_knots) -> np.ndarray: - """Define the knots as the quantiles""" - return np.quantile(a=self.coordinates.df_all_coordinates.iloc[:, dims], q=np.linspace(0, 1, nb_knots+2)[1:-1]) - - @property - def form_dict(self) -> Dict[str, str]: - """ - 3 examples of potential form dict: - loc.form <- y ~ rb(locations[,1], knots = knots, degree = 3, penalty = .5) - scale.form <- y ~ rb(locations[,2], knots = knots2, degree = 3, penalty = .5) - shape.form <- y ~ rb(locations, knots = knots_tot, degree = 3, penalty = .5) - """ - pass - - def load_specific_param_function(self, param_name) -> AbstractParamFunction: - dims = self.param_name_to_dims[param_name] + def load_specific_param_function(self, param_name): coef = self.param_name_to_coef[param_name] - nb_knots = self.param_name_to_nb_knots[param_name] - knots = self.compute_knots(dims, nb_knots) - return SplineParamFunction(dims=dims, degree=self.degree, spline_coef=coef, knots=knots) - - - - - - - - - - + assert isinstance(coef, (PolynomialAllCoef, SplineAllCoef)) + if isinstance(coef, PolynomialAllCoef): + return PolynomialParamFunction(dim_and_degree=self.param_name_to_dim_and_max_degree[param_name], coef=coef) + else: + return SplineParamFunction(dim_and_degree=self.param_name_to_dim_and_max_degree[param_name], coef=coef) + + def get_params(self, coordinate: np.ndarray, is_transformed: bool = True) -> GevParams: + return super().get_params(coordinate, is_transformed) + + @classmethod + def from_coef_dict(cls, coordinates: AbstractCoordinates, param_name_to_dims: Dict[str, List[Tuple[int, int]]], + coef_dict: Dict[str, float], starting_point: Union[None, int] = None, log_scale=None): + + coef_dict, spline_param_name_to_dim_to_knots_and_coefficient = coef_dict + # Load polynomial coefficient + polynomial_margin_function = PolynomialMarginFunction.from_coef_dict(coordinates, param_name_to_dims, coef_dict, starting_point, log_scale) + param_name_to_coef = polynomial_margin_function.param_name_to_coef + param_name_to_dim_and_max_degree = param_name_to_dims + # Load the remaining spline coefficient + assert cls.COEF_CLASS is not None, 'a COEF_CLASS class attributes needs to be defined' + for param_name, dim_to_knots_and_coefficients in spline_param_name_to_dim_to_knots_and_coefficient.items(): + dim_to_spline_coef = {} + for dim, (knots, coefficients) in dim_to_knots_and_coefficients.items(): + dim_to_spline_coef[dim] = SplineCoef(param_name, coefficients, knots) + param_name_to_coef[param_name] = SplineAllCoef(param_name, dim_to_spline_coef) + return cls(coordinates, param_name_to_dim_and_max_degree, param_name_to_coef, starting_point, log_scale=log_scale) diff --git a/extreme_fit/function/param_function/param_function.py b/extreme_fit/function/param_function/param_function.py index e6c65b06149b0d3d86c1d900b3b1b17a9a5f306e..0f236d27d72a10f10986af2f0184bab7651bd7ac 100644 --- a/extreme_fit/function/param_function/param_function.py +++ b/extreme_fit/function/param_function/param_function.py @@ -2,10 +2,12 @@ import operator from functools import reduce from typing import List import numpy as np +from scipy.interpolate import BSpline + from extreme_fit.function.param_function.linear_coef import LinearCoef from extreme_fit.function.param_function.one_axis_param_function import LinearOneAxisParamFunction from extreme_fit.function.param_function.polynomial_coef import PolynomialAllCoef, PolynomialCoef -from extreme_fit.function.param_function.spline_coef import SplineCoef +from extreme_fit.function.param_function.spline_coef import PolynomialCoef, SplineAllCoef class AbstractParamFunction(object): @@ -78,25 +80,15 @@ class PolynomialParamFunction(AbstractParamFunction): class SplineParamFunction(AbstractParamFunction): - def __init__(self, dims, degree, spline_coef: SplineCoef, knots: np.ndarray) -> None: - self.spline_coef = spline_coef - self.degree = degree - self.dims = dims - self.knots = knots - - @property - def m(self) -> int: - return int((self.degree + 1) / 2) + def __init__(self, dim_and_degree, coef: SplineAllCoef) -> None: + self.coef = coef + self.dim_and_degree = dim_and_degree def get_param_value(self, coordinate: np.ndarray) -> float: - gev_param_value = self.spline_coef.intercept - # Polynomial part - for dim in self.dims: - polynomial_coef = self.spline_coef.dim_to_polynomial_coef[dim] - for degree in range(1, self.m): - gev_param_value += polynomial_coef.get_coef(degree) * coordinate[dim] - # Knot part - for idx, knot in enumerate(self.knots): - distance = np.power(np.linalg.norm(coordinate - knot), self.degree) - gev_param_value += self.spline_coef.knot_coef.get_coef(idx) * distance + gev_param_value = 0 + for i, (dim, max_degree) in enumerate(self.dim_and_degree): + assert isinstance(dim, int) + spline_coef = self.coef.dim_to_spline_coef[dim] + spl = BSpline(t=spline_coef.knots, c=spline_coef.coefficients, k=max_degree) + gev_param_value += spl(coordinate)[0] return gev_param_value diff --git a/extreme_fit/function/param_function/spline_coef.py b/extreme_fit/function/param_function/spline_coef.py index 8bab833c7ffbe95087438921841a2c71241a7516..204a8f68bc411505b23c99d81b052a64b7f3cb16 100644 --- a/extreme_fit/function/param_function/spline_coef.py +++ b/extreme_fit/function/param_function/spline_coef.py @@ -4,16 +4,44 @@ from extreme_fit.function.param_function.abstract_coef import AbstractCoef from extreme_fit.function.param_function.linear_coef import LinearCoef from extreme_fit.function.param_function.polynomial_coef import PolynomialCoef +from typing import Dict, List -class KnotCoef(AbstractCoef): - - def __init__(self, param_name: str, default_value: float = 1.0, idx_to_coef=None): - super().__init__(param_name, default_value, idx_to_coef) +from extreme_fit.function.param_function.abstract_coef import AbstractCoef +from extreme_fit.function.param_function.linear_coef import LinearCoef +from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates class SplineCoef(AbstractCoef): - def __init__(self, param_name: str, knot_coef: KnotCoef, dim_to_polynomial_coef: Dict[int, PolynomialCoef]): + def __init__(self, param_name: str, coefficients, knots): + super().__init__(param_name) + self.knots = knots + self.coefficients = coefficients + self.max_degree = self.nb_knots - (self.nb_coefficients + 1) + + @property + def nb_knots(self): + return len(self.knots) + + @property + def nb_coefficients(self): + return len(self.coefficients) + + +class SplineAllCoef(LinearCoef): + + def __init__(self, param_name, dim_to_spline_coef: Dict[int, SplineCoef]): super().__init__(param_name, 1.0, None) - self.knot_coef = knot_coef - self.dim_to_polynomial_coef = dim_to_polynomial_coef + self.dim_to_spline_coef = dim_to_spline_coef + + def form_dict(self, coordinates_names: List[str], dims) -> Dict[str, str]: + formula_list = [] + if len(coordinates_names) == 0: + formula_str = '1' + else: + for dim, name in zip(dims, coordinates_names): + spline_coef = self.dim_to_spline_coef[dim] + formula_list.append('s({}, m={}, k={}, bs="bs")'.format(name, spline_coef.max_degree, + spline_coef.nb_coefficients)) + formula_str = ' '.join(formula_list) + return {self.param_name + '.form': self.param_name + ' ~ ' + formula_str} diff --git a/extreme_fit/model/margin_model/linear_margin_model/margin_model_with_effect/abstract_margin_model_with_effect.py b/extreme_fit/model/margin_model/linear_margin_model/margin_model_with_effect/abstract_margin_model_with_effect.py deleted file mode 100644 index 72552ac741149d19a47ea5fa7eb0573923391111..0000000000000000000000000000000000000000 --- a/extreme_fit/model/margin_model/linear_margin_model/margin_model_with_effect/abstract_margin_model_with_effect.py +++ /dev/null @@ -1,5 +0,0 @@ - - -class AbstractMarginModelWithEffect(object): - pass - diff --git a/extreme_fit/model/margin_model/linear_margin_model/margin_model_with_effect/margin_model_with_gcm_effect.py b/extreme_fit/model/margin_model/linear_margin_model/margin_model_with_effect/margin_model_with_gcm_effect.py deleted file mode 100644 index 79e5361073091baa96ee385d16133bbac1934f14..0000000000000000000000000000000000000000 --- a/extreme_fit/model/margin_model/linear_margin_model/margin_model_with_effect/margin_model_with_gcm_effect.py +++ /dev/null @@ -1,11 +0,0 @@ -from extreme_fit.model.margin_model.linear_margin_model.margin_model_with_effect.abstract_margin_model_with_effect import \ - AbstractMarginModelWithEffect -from extreme_fit.model.margin_model.polynomial_margin_model.gev_altitudinal_models import StationaryAltitudinal - - -class MarginModelWithGcmEffect(AbstractMarginModelWithEffect): - pass - - -class StationaryAltitudinalWithGCMEffect(MarginModelWithGcmEffect, StationaryAltitudinal): - pass diff --git a/extreme_fit/model/margin_model/spline_margin_model.py b/extreme_fit/model/margin_model/spline_margin_model.py deleted file mode 100644 index b5d97d2208c097ad6e6fad9cf87e63aec9e3dcfa..0000000000000000000000000000000000000000 --- a/extreme_fit/model/margin_model/spline_margin_model.py +++ /dev/null @@ -1,59 +0,0 @@ -from typing import Dict, List - -from extreme_fit.function.margin_function.spline_margin_function import SplineMarginFunction -from extreme_fit.function.param_function.abstract_coef import AbstractCoef -from extreme_fit.function.param_function.spline_coef import SplineCoef, KnotCoef, \ - PolynomialCoef -from extreme_fit.model.margin_model.parametric_margin_model import ParametricMarginModel -from extreme_fit.distribution.gev.gev_params import GevParams -from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates - - -class SplineMarginModel(ParametricMarginModel): - - def __init__(self, coordinates: AbstractCoordinates, - params_user=None): - super().__init__(coordinates, params_user) - - def load_margin_function(self, param_name_to_dims: Dict[str, List[int]] = None, - param_name_to_coef: Dict[str, AbstractCoef] = None, - param_name_to_nb_knots: Dict[str, int] = None, - degree=3): - # Default parameters - # todo: for the default parameters: take inspiration from the linear_margin_model - # also implement the class method thing - if param_name_to_dims is None: - param_name_to_dims = {param_name: self.coordinates.coordinates_dims - for param_name in GevParams.PARAM_NAMES} - if param_name_to_coef is None: - param_name_to_coef = {} - for param_name in GevParams.PARAM_NAMES: - knot_coef = KnotCoef(param_name) - polynomial_coef = PolynomialCoef(param_name) - dim_to_polynomial_coef = {dim: polynomial_coef for dim in self.coordinates.coordinates_dims} - spline_coef = SplineCoef(param_name, knot_coef, dim_to_polynomial_coef) - param_name_to_coef[param_name] = spline_coef - if param_name_to_nb_knots is None: - param_name_to_nb_knots = {param_name: 2 for param_name in GevParams.PARAM_NAMES} - - return SplineMarginFunction(coordinates=self.coordinates, - param_name_to_dims=param_name_to_dims, - param_name_to_coef=param_name_to_coef, - param_name_to_nb_knots=param_name_to_nb_knots, - degree=degree) - - -class ConstantSplineMarginModel(SplineMarginModel): - - def load_margin_function(self, param_name_to_dims: Dict[str, List[int]] = None, - param_name_to_coef: Dict[str, AbstractCoef] = None, - param_name_to_nb_knots: Dict[str, int] = None, degree=3): - return super().load_margin_function({}, param_name_to_coef, param_name_to_nb_knots, degree) - - -class Degree1SplineMarginModel(SplineMarginModel): - - def load_margin_function(self, param_name_to_dims: Dict[str, List[int]] = None, - param_name_to_coef: Dict[str, AbstractCoef] = None, - param_name_to_nb_knots: Dict[str, int] = None, degree=3): - return super().load_margin_function(param_name_to_dims, param_name_to_coef, param_name_to_nb_knots, degree=1) diff --git a/extreme_fit/model/margin_model/linear_margin_model/margin_model_with_effect/__init__.py b/extreme_fit/model/margin_model/spline_margin_model/__init__.py similarity index 100% rename from extreme_fit/model/margin_model/linear_margin_model/margin_model_with_effect/__init__.py rename to extreme_fit/model/margin_model/spline_margin_model/__init__.py diff --git a/extreme_fit/model/margin_model/spline_margin_model/spline_margin_model.py b/extreme_fit/model/margin_model/spline_margin_model/spline_margin_model.py new file mode 100644 index 0000000000000000000000000000000000000000..d13e746ee788b4f0de31126fc2e80d50532c14e2 --- /dev/null +++ b/extreme_fit/model/margin_model/spline_margin_model/spline_margin_model.py @@ -0,0 +1,92 @@ +import itertools + +import numpy as np +from cached_property import cached_property + +from extreme_fit.distribution.gev.gev_params import GevParams +from extreme_fit.function.margin_function.spline_margin_function import SplineMarginFunction +from extreme_fit.function.param_function.polynomial_coef import PolynomialCoef +from extreme_fit.function.param_function.spline_coef import SplineAllCoef, SplineCoef +from extreme_fit.model.margin_model.linear_margin_model.abstract_temporal_linear_margin_model import \ + AbstractTemporalLinearMarginModel +from extreme_fit.model.margin_model.utils import MarginFitMethod +from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates + + +class SplineMarginModel(AbstractTemporalLinearMarginModel): + + def __init__(self, coordinates: AbstractCoordinates, params_user=None, starting_point=None, + fit_method=MarginFitMethod.extremes_fevd_mle, nb_iterations_for_bayesian_fit=5000, + params_initial_fit_bayesian=None, type_for_MLE="GEV", params_class=GevParams, max_degree=1, + temporal_covariate_for_fit=None): + super().__init__(coordinates, params_user, starting_point, fit_method, nb_iterations_for_bayesian_fit, + params_initial_fit_bayesian, type_for_MLE, params_class, temporal_covariate_for_fit) + self.max_degree = max_degree + + @property + def nb_params(self): + return sum([c.nb_params for c in self.margin_function.param_name_to_coef.values()]) + + @cached_property + def margin_function(self) -> SplineMarginFunction: + return super().margin_function + + def load_margin_function(self, param_name_to_list_dim_and_degree_and_nb_intervals=None): + # Assert the order of list of dim and degree, to match the order of the form dict, + # i.e. 1) spatial individual terms 2) combined terms 3) temporal individual terms + for param_name, list_dim_and_degree_and_nb_intervals in param_name_to_list_dim_and_degree_and_nb_intervals.items(): + dims = [d for d, m, nb in list_dim_and_degree_and_nb_intervals] + assert all([isinstance(d, int) or isinstance(d, tuple) for d in dims]) + if self.coordinates.has_spatial_coordinates and self.coordinates.idx_x_coordinates in dims: + assert dims.index(self.coordinates.idx_x_coordinates) == 0 + if self.coordinates.has_temporal_coordinates and self.coordinates.idx_temporal_coordinates in dims: + assert dims.index(self.coordinates.idx_temporal_coordinates) == len(dims) - 1 + # Assert that the degree are inferior to the max degree + for list_dim_and_degree_and_nb_intervals in param_name_to_list_dim_and_degree_and_nb_intervals.values(): + for _, max_degree, _ in list_dim_and_degree_and_nb_intervals: + assert max_degree <= self.max_degree, 'Max degree (={}) specified is too high'.format(max_degree) + # Load param_name_to_spline_all_coef + param_name_to_spline_all_coef = self.param_name_to_spline_all_coef( + param_name_to_list_dim_and_degree_and_nb_intervals=param_name_to_list_dim_and_degree_and_nb_intervals, + param_name_and_dim_and_degree_to_default_coef=self.default_params) + param_name_to_dim_and_max_degree = {p: [t[:2] for t in l] for p, l in param_name_to_list_dim_and_degree_and_nb_intervals.items()} + return SplineMarginFunction(coordinates=self.coordinates, + param_name_to_coef=param_name_to_spline_all_coef, + param_name_to_dim_and_max_degree=param_name_to_dim_and_max_degree, + starting_point=self.starting_point, + params_class=self.params_class) + + @property + def default_params(self) -> dict: + default_slope = 0.01 + param_name_and_dim_and_degree_to_coef = {} + for param_name in self.params_class.PARAM_NAMES: + all_individual_dims = self.coordinates.coordinates_dims + combinations_of_two_dims = list(itertools.combinations(all_individual_dims, 2)) + dims = all_individual_dims + combinations_of_two_dims + for dim in dims: + for degree in range(self.max_degree + 1): + param_name_and_dim_and_degree_to_coef[(param_name, dim, degree)] = default_slope + return param_name_and_dim_and_degree_to_coef + + def param_name_to_spline_all_coef(self, param_name_to_list_dim_and_degree_and_nb_intervals, + param_name_and_dim_and_degree_to_default_coef): + param_name_to_spline_all_coef = {} + param_names = list(set([e[0] for e in param_name_and_dim_and_degree_to_default_coef.keys()])) + for param_name in param_names: + dim_to_spline_coef = {} + for dim, max_degree, nb_intervals in param_name_to_list_dim_and_degree_and_nb_intervals.get(param_name, []): + nb_coefficients = nb_intervals + 1 + coefficients = np.arange(nb_coefficients) + knots = np.arange(nb_coefficients + max_degree + 1) + dim_to_spline_coef[dim] = SplineCoef(param_name, knots=knots, coefficients=coefficients) + if len(dim_to_spline_coef) == 0: + dim_to_spline_coef = None + spline_all_coef = SplineAllCoef(param_name=param_name, + dim_to_spline_coef=dim_to_spline_coef) + param_name_to_spline_all_coef[param_name] = spline_all_coef + return param_name_to_spline_all_coef + + @property + def param_name_to_list_for_result(self): + return self.margin_function.param_name_to_dim_and_max_degree diff --git a/extreme_fit/model/margin_model/spline_margin_model/temporal_spline_model_degree_1.py b/extreme_fit/model/margin_model/spline_margin_model/temporal_spline_model_degree_1.py new file mode 100644 index 0000000000000000000000000000000000000000..5fa8f0dc41ac4bd83b1757e719bcf7822ea72a93 --- /dev/null +++ b/extreme_fit/model/margin_model/spline_margin_model/temporal_spline_model_degree_1.py @@ -0,0 +1,16 @@ +from extreme_fit.distribution.gev.gev_params import GevParams +from extreme_fit.model.margin_model.spline_margin_model.spline_margin_model import SplineMarginModel + + +class NonStationaryTwoLinearLocationModel(SplineMarginModel): + + def load_margin_function(self, param_name_to_dims=None): + # Degree 1, Two Linear sections for the location + return super().load_margin_function({GevParams.LOC: [(self.coordinates.idx_temporal_coordinates, 1, 2)]}) + + +class NonStationaryTwoLinearScaleModel(SplineMarginModel): + + def load_margin_function(self, param_name_to_dims=None): + # Degree 1, Two Linear sections for the scale parameters + return super().load_margin_function({GevParams.SCALE: [(self.coordinates.idx_temporal_coordinates, 1, 2)]}) diff --git a/extreme_fit/model/result_from_model_fit/result_from_extremes/result_from_evgam.py b/extreme_fit/model/result_from_model_fit/result_from_extremes/result_from_evgam.py index 3687cc0c0a1af66793b722f60db68ed8f827c10c..592066be4c5b3cdbb77aaccceb542746d43a1580 100644 --- a/extreme_fit/model/result_from_model_fit/result_from_extremes/result_from_evgam.py +++ b/extreme_fit/model/result_from_model_fit/result_from_extremes/result_from_evgam.py @@ -60,8 +60,40 @@ class ResultFromEvgam(AbstractResultFromExtremes): @property def margin_coef_ordered_dict(self): - # print(self.name_to_value.keys()) - # raise NotImplementedError - values = np.array(self.name_to_value['coefficients']) - return get_margin_coef_ordered_dict(self.param_name_to_dim, values, self.type_for_mle, - dim_to_coordinate_name=self.dim_to_coordinate) + coefficients = np.array(self.name_to_value['coefficients']) + param_name_to_str_formula = {k: str(v) for k, v in self.get_python_dictionary(self.name_to_value['formula']).items()} + r_param_names_with_spline = [k for k, v in param_name_to_str_formula.items() if "s(" in v] + if len(r_param_names_with_spline) == 0: + return get_margin_coef_ordered_dict(self.param_name_to_dim, coefficients, self.type_for_mle, + dim_to_coordinate_name=self.dim_to_coordinate) + else: + # todo we might need to delete some coefficient for the spline param so that it does not create some assertion error + coef_dict = get_margin_coef_ordered_dict(self.param_name_to_dim, coefficients, self.type_for_mle, + dim_to_coordinate_name=self.dim_to_coordinate) + spline_param_name_to_dim_knots_and_coefficient = {} + for r_param_name in r_param_names_with_spline: + print('here') + param_name = self.r_param_name_to_param_name[r_param_name] + dim_knots_and_coefficient = {} + dims = self.param_name_to_dim[param_name] + if len(dims) > 1: + raise NotImplementedError + else: + dim, max_degree = dims[0] + d = self.get_python_dictionary(self.name_to_value[r_param_name]) + coefficients = np.array(d["coefficients"]) + smooth = list(d['smooth'])[0] + knots = np.array(self.get_python_dictionary(smooth)['knots']) + assert len(knots) == len(coefficients) + 1 + max_degree + dim_knots_and_coefficient[dim] = (knots, coefficients) + spline_param_name_to_dim_knots_and_coefficient[param_name] = dim_knots_and_coefficient + + return coef_dict, spline_param_name_to_dim_knots_and_coefficient + + @property + def r_param_name_to_param_name(self): + return { + 'location': GevParams.LOC, + 'scale': GevParams.SCALE, + 'shape': GevParams.SHAPE, + } \ No newline at end of file diff --git a/test/test_extreme_fit/test_estimator/test_temporal_estimator/test_gev_temporal_spline.py b/test/test_extreme_fit/test_estimator/test_temporal_estimator/test_gev_temporal_spline.py new file mode 100644 index 0000000000000000000000000000000000000000..90360e3ba5b22ab6aab16ca9347b216638ddbd10 --- /dev/null +++ b/test/test_extreme_fit/test_estimator/test_temporal_estimator/test_gev_temporal_spline.py @@ -0,0 +1,76 @@ +import unittest + +import numpy as np +import pandas as pd + +from extreme_fit.distribution.gev.gev_params import GevParams +from extreme_fit.model.margin_model.polynomial_margin_model.polynomial_margin_model import NonStationaryQuadraticLocationModel, \ + NonStationaryQuadraticScaleModel, NonStationaryQuadraticLocationGumbelModel, NonStationaryQuadraticScaleGumbelModel +from extreme_fit.model.margin_model.spline_margin_model.temporal_spline_model_degree_1 import \ + NonStationaryTwoLinearLocationModel, NonStationaryTwoLinearScaleModel +from extreme_trend.trend_test.abstract_gev_trend_test import fitted_linear_margin_estimator +from extreme_fit.model.margin_model.utils import \ + MarginFitMethod +from extreme_fit.model.utils import r, set_seed_r +from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates +from spatio_temporal_dataset.coordinates.temporal_coordinates.abstract_temporal_coordinates import \ + AbstractTemporalCoordinates +from spatio_temporal_dataset.dataset.abstract_dataset import AbstractDataset +from spatio_temporal_dataset.spatio_temporal_observations.abstract_spatio_temporal_observations import \ + AbstractSpatioTemporalObservations + + +class TestGevTemporalSpline(unittest.TestCase): + + def setUp(self) -> None: + set_seed_r() + r(""" + N <- 50 + loc = 0; scale = 1; shape <- 1 + x_gev <- rgev(N, loc = loc, scale = scale, shape = shape) + start_loc = 0; start_scale = 1; start_shape = 1 + """) + # Compute the stationary temporal margin with isMev + self.start_year = 0 + nb_years = 50 + self.last_year = self.start_year + nb_years - 1 + df = pd.DataFrame({AbstractCoordinates.COORDINATE_T: range(self.start_year, self.start_year + nb_years)}) + self.coordinates = AbstractTemporalCoordinates.from_df(df) + df2 = pd.DataFrame(data=np.array(r['x_gev']), index=df.index) + observations = AbstractSpatioTemporalObservations(df_maxima_gev=df2) + self.dataset = AbstractDataset(observations=observations, coordinates=self.coordinates) + self.fit_method = MarginFitMethod.evgam + + def function_test_gev_temporal_margin_fit_non_stationary_spline(self, model_class, param_to_test): + # Create estimator + estimator = fitted_linear_margin_estimator(model_class, + self.coordinates, self.dataset, + starting_year=0, + fit_method=self.fit_method) + # Checks that parameters returned are indeed different + mle_params_estimated_year1 = estimator.function_from_fit.get_params(np.array([1])).to_dict() + print(mle_params_estimated_year1, type(mle_params_estimated_year1)) + mle_params_estimated_year3 = estimator.function_from_fit.get_params(np.array([21])).to_dict() + mle_params_estimated_year5 = estimator.function_from_fit.get_params(np.array([self.last_year])).to_dict() + self.assertNotEqual(mle_params_estimated_year1, mle_params_estimated_year3) + self.assertNotEqual(mle_params_estimated_year3, mle_params_estimated_year5) + # Assert the relationship for the location is different between the beginning and the end + diff1 = mle_params_estimated_year1[param_to_test] - mle_params_estimated_year3[param_to_test] + diff2 = mle_params_estimated_year3[param_to_test] - mle_params_estimated_year5[param_to_test] + self.assertNotAlmostEqual(diff1, diff2) + # Assert that indicators are correctly computed + self.assertAlmostEqual(estimator.result_from_model_fit.nllh, estimator.nllh()) + # self.assertAlmostEqual(estimator.result_from_model_fit.aic, estimator.aic()) + # self.assertAlmostEqual(estimator.result_from_model_fit.bic, estimator.bic()) + + # def test_gev_temporal_margin_fit_spline_two_linear_location(self): + # self.function_test_gev_temporal_margin_fit_non_stationary_spline(NonStationaryTwoLinearLocationModel, + # param_to_test=GevParams.LOC) + # + # def test_gev_temporal_margin_fit_spline_two_linear_scale(self): + # self.function_test_gev_temporal_margin_fit_non_stationary_spline(NonStationaryTwoLinearScaleModel, + # param_to_test=GevParams.SCALE) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/test_extreme_fit/test_model/test_margin_model.py b/test/test_extreme_fit/test_model/test_margin_model.py index b26149b5e73f06b29213eefca1174e24bac467a3..0ff9c4f70e576ce6b9fff7aa427dee669d854dba 100644 --- a/test/test_extreme_fit/test_model/test_margin_model.py +++ b/test/test_extreme_fit/test_model/test_margin_model.py @@ -1,10 +1,12 @@ import unittest from extreme_fit.model.margin_model.linear_margin_model.linear_margin_model import LinearAllParametersAllDimsMarginModel -from extreme_fit.model.margin_model.spline_margin_model import Degree1SplineMarginModel from extreme_fit.distribution.gev.gev_params import GevParams +from extreme_fit.model.margin_model.spline_margin_model.temporal_spline_model_degree_1 import NonStationaryTwoLinearLocationModel from spatio_temporal_dataset.coordinates.spatial_coordinates.coordinates_1D import LinSpaceSpatialCoordinates from spatio_temporal_dataset.coordinates.spatial_coordinates.coordinates_2D import LinSpaceSpatial2DCoordinates +from spatio_temporal_dataset.coordinates.temporal_coordinates.generated_temporal_coordinates import \ + ConsecutiveTemporalCoordinates from test.test_utils import load_test_spatiotemporal_coordinates @@ -32,27 +34,17 @@ class TestVisualizationLinearMarginModel(unittest.TestCase): class TestVisualizationSplineMarginModel(unittest.TestCase): - DISPLAY = False - nb_points = 2 - margin_model_class = Degree1SplineMarginModel - - def tearDown(self) -> None: - self.margin_model.margin_function.visualize_function(show=self.DISPLAY) - self.assertTrue(True) + DISPLAY = True + nb_points = 50 + margin_model_class = NonStationaryTwoLinearLocationModel - def test_example_visualization_1D_spline(self): - coordinates = LinSpaceSpatialCoordinates.from_nb_points(nb_points=self.nb_points, start=0.0) - self.margin_model = self.margin_model_class(coordinates=coordinates, params_user={(GevParams.SHAPE, 1): 0.02}) + # def tearDown(self) -> None: + # self.margin_model.margin_function.visualize_function(show=self.DISPLAY) + # self.assertTrue(True) - def test_example_visualization_2D_spatial_spline(self): - spatial_coordinates = LinSpaceSpatial2DCoordinates.from_nb_points(nb_points=self.nb_points) - self.margin_model = self.margin_model_class(coordinates=spatial_coordinates) - # TODO: add a similar test than in the linear case - # # Assert that the grid correspond to what we expect in a simple case - # AbstractMarginFunction.VISUALIZATION_RESOLUTION = 2 - # grid = self.margin_model.margin_function.grid_2D['loc'] - # true_grid = np.array([[0.98, 1.0], [1.0, 1.02]]) - # self.assertTrue((grid == true_grid).all(), msg="\nexpected:\n{}, \nfound:\n{}".format(true_grid, grid)) + # def test_example_visualization_1D_spline(self): + # coordinates = ConsecutiveTemporalCoordinates.from_nb_temporal_steps(nb_temporal_steps=self.nb_points) + # self.margin_model = self.margin_model_class(coordinates=coordinates) if __name__ == '__main__': diff --git a/test/test_extreme_trend/test_ensemble_fit.py b/test/test_extreme_trend/test_ensemble_fit.py index beab3a9889379a60a5a79bf640143be02f16153b..a393174a2cec3888ac6bacf8a285fbd1f7ec02bb 100644 --- a/test/test_extreme_trend/test_ensemble_fit.py +++ b/test/test_extreme_trend/test_ensemble_fit.py @@ -3,8 +3,6 @@ import unittest from extreme_data.meteo_france_data.adamont_data.adamont.adamont_safran import AdamontSnowfall from extreme_data.meteo_france_data.adamont_data.adamont_scenario import AdamontScenario from extreme_data.meteo_france_data.scm_models_data.altitudes_studies import AltitudesStudies -from extreme_fit.model.margin_model.linear_margin_model.margin_model_with_effect.margin_model_with_gcm_effect import \ - StationaryAltitudinalWithGCMEffect from extreme_fit.model.margin_model.polynomial_margin_model.gev_altitudinal_models import StationaryAltitudinal from extreme_fit.model.margin_model.polynomial_margin_model.models_based_on_pariwise_analysis.gev_with_constant_shape_wrt_altitude import \ AltitudinalShapeConstantTimeLocationLinear, AltitudinalShapeConstantTimeScaleLinear, \ @@ -49,8 +47,7 @@ class TestEnsembleFit(unittest.TestCase): self.assertTrue(True) def test_ensembe_fit_with_effect(self): - model_classes = [StationaryAltitudinal, - StationaryAltitudinalWithGCMEffect][:1] + model_classes = [StationaryAltitudinal][:] for temporal_covariate in [TimeTemporalCovariate, AnomalyTemperatureWithSplineTemporalCovariate]: @@ -63,7 +60,6 @@ class TestEnsembleFit(unittest.TestCase): model_class_to_estimator = ensemble_fit.visualizer.massif_name_to_one_fold_fit[self.massif_names[0]].model_class_to_estimator model_class_to_expected_number_params = { StationaryAltitudinal: 5, - StationaryAltitudinalWithGCMEffect: 6, } for model_class in model_classes: expected = model_class_to_expected_number_params[model_class]