From 7f77709228da2e2a9a651114d72ec65fe02f6ba9 Mon Sep 17 00:00:00 2001 From: Le Roux Erwan <erwan.le-roux@irstea.fr> Date: Tue, 7 May 2019 09:43:28 +0200 Subject: [PATCH] [EXTREME ESTIMATOR][MARGIN MODEL] add temporal linear margin model test. check that starting point are taken into account. --- .../estimator/abstract_estimator.py | 8 +- .../full_estimator/abstract_full_estimator.py | 6 +- .../abstract_margin_estimator.py | 9 ++- .../independent_margin_function.py | 4 +- .../margin_function/linear_margin_function.py | 40 ++++------ .../parametric_margin_function.py | 31 ++++---- .../test_gev/test_gev_temporal_margin.py | 77 +++++++++++++++++++ 7 files changed, 127 insertions(+), 48 deletions(-) create mode 100644 test/test_extreme_estimator/test_margin_fits/test_gev/test_gev_temporal_margin.py diff --git a/extreme_estimator/estimator/abstract_estimator.py b/extreme_estimator/estimator/abstract_estimator.py index 0dfb1643..857fef72 100644 --- a/extreme_estimator/estimator/abstract_estimator.py +++ b/extreme_estimator/estimator/abstract_estimator.py @@ -1,5 +1,6 @@ import time +from extreme_estimator.extreme_models.margin_model.linear_margin_model import LinearMarginModel from extreme_estimator.extreme_models.margin_model.margin_function.parametric_margin_function import \ ParametricMarginFunction from extreme_estimator.extreme_models.result_from_fit import ResultFromFit @@ -51,10 +52,11 @@ class AbstractEstimator(object): def extract_function_fitted(self) -> AbstractMarginFunction: raise NotImplementedError - def extract_function_fitted_from_function_to_fit(self, margin_function_to_fit: ParametricMarginFunction): + def extract_function_fitted_from_the_model_shape(self, margin_model: LinearMarginModel): return LinearMarginFunction.from_coef_dict(coordinates=self.dataset.coordinates, - gev_param_name_to_dims=margin_function_to_fit.gev_param_name_to_dims, - coef_dict=self.result_from_fit.margin_coef_dict) + gev_param_name_to_dims=margin_model.margin_function_start_fit.gev_param_name_to_dims, + coef_dict=self.result_from_fit.margin_coef_dict, + starting_point=margin_model.starting_point) # @property # def max_stable_fitted(self) -> AbstractMarginFunction: diff --git a/extreme_estimator/estimator/full_estimator/abstract_full_estimator.py b/extreme_estimator/estimator/full_estimator/abstract_full_estimator.py index 547b54f6..b01ba712 100644 --- a/extreme_estimator/estimator/full_estimator/abstract_full_estimator.py +++ b/extreme_estimator/estimator/full_estimator/abstract_full_estimator.py @@ -63,12 +63,12 @@ class FullEstimatorInASingleStepWithSmoothMargin(AbstractFullEstimator): df_coordinates_spat=self.dataset.coordinates.df_spatial_coordinates(self.train_split), df_coordinates_temp=self.dataset.coordinates.df_temporal_coordinates(self.train_split), fit_marge=True, - fit_marge_form_dict=self.margin_function_start_fit.form_dict, - margin_start_dict=self.margin_function_start_fit.coef_dict + fit_marge_form_dict=self.linear_margin_model.margin_function_start_fit.form_dict, + margin_start_dict=self.linear_margin_model.margin_function_start_fit.coef_dict ) def extract_function_fitted(self): - return self.extract_function_fitted_from_function_to_fit(self.margin_function_start_fit) + return self.extract_function_fitted_from_the_model_shape(self.linear_margin_model) class PointwiseAndThenUnitaryMsp(AbstractFullEstimator): diff --git a/extreme_estimator/estimator/margin_estimator/abstract_margin_estimator.py b/extreme_estimator/estimator/margin_estimator/abstract_margin_estimator.py index 7acdf0ce..38f8e5c3 100644 --- a/extreme_estimator/estimator/margin_estimator/abstract_margin_estimator.py +++ b/extreme_estimator/estimator/margin_estimator/abstract_margin_estimator.py @@ -5,6 +5,7 @@ from extreme_estimator.extreme_models.margin_model.margin_function.abstract_marg AbstractMarginFunction from extreme_estimator.extreme_models.margin_model.linear_margin_model import LinearMarginModel, \ LinearAllParametersAllDimsMarginModel +from extreme_estimator.extreme_models.margin_model.margin_function.linear_margin_function import LinearMarginFunction from spatio_temporal_dataset.dataset.abstract_dataset import AbstractDataset @@ -35,6 +36,10 @@ class LinearMarginEstimator(AbstractMarginEstimator): df_coordinates_spat=df_coordinates_spat, df_coordinates_temp=df_coordinates_temp) - def extract_function_fitted(self): - return self.extract_function_fitted_from_function_to_fit(self.margin_model.margin_function_start_fit) + @property + def margin_function_fitted(self) -> LinearMarginFunction: + return super().margin_function_fitted + + def extract_function_fitted(self) -> LinearMarginFunction: + return self.extract_function_fitted_from_the_model_shape(self.margin_model) diff --git a/extreme_estimator/extreme_models/margin_model/margin_function/independent_margin_function.py b/extreme_estimator/extreme_models/margin_model/margin_function/independent_margin_function.py index c4d34c8d..25350f19 100644 --- a/extreme_estimator/extreme_models/margin_model/margin_function/independent_margin_function.py +++ b/extreme_estimator/extreme_models/margin_model/margin_function/independent_margin_function.py @@ -1,4 +1,4 @@ -from typing import Dict +from typing import Dict, Union import numpy as np @@ -17,7 +17,7 @@ class IndependentMarginFunction(AbstractMarginFunction): def __init__(self, coordinates: AbstractCoordinates): """Attribute 'gev_param_name_to_param_function' maps each GEV parameter to its corresponding function""" super().__init__(coordinates) - self.gev_param_name_to_param_function = None # type: Dict[str, AbstractParamFunction] + self.gev_param_name_to_param_function = None # type: Union[None, Dict[str, AbstractParamFunction]] def get_gev_params(self, coordinate: np.ndarray) -> GevParams: """Each GEV parameter is computed independently through its corresponding param_function""" diff --git a/extreme_estimator/extreme_models/margin_model/margin_function/linear_margin_function.py b/extreme_estimator/extreme_models/margin_model/margin_function/linear_margin_function.py index db54afeb..3341bf2e 100644 --- a/extreme_estimator/extreme_models/margin_model/margin_function/linear_margin_function.py +++ b/extreme_estimator/extreme_models/margin_model/margin_function/linear_margin_function.py @@ -1,13 +1,11 @@ -from typing import Dict, List - -import numpy as np +from typing import Dict, List, Union from extreme_estimator.extreme_models.margin_model.margin_function.parametric_margin_function import \ ParametricMarginFunction from extreme_estimator.extreme_models.margin_model.param_function.abstract_coef import AbstractCoef from extreme_estimator.extreme_models.margin_model.param_function.linear_coef import LinearCoef -from extreme_estimator.extreme_models.margin_model.param_function.param_function import ConstantParamFunction, \ - AbstractParamFunction, LinearParamFunction +from extreme_estimator.extreme_models.margin_model.param_function.param_function import AbstractParamFunction, \ + LinearParamFunction from extreme_estimator.margin_fits.gev.gev_params import GevParams from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates @@ -29,32 +27,15 @@ class LinearMarginFunction(ParametricMarginFunction): COEF_CLASS = LinearCoef def __init__(self, coordinates: AbstractCoordinates, gev_param_name_to_dims: Dict[str, List[int]], - gev_param_name_to_coef: Dict[str, AbstractCoef], - starting_point=None): - # Starting point for the trend is the same for all the parameters - self.starting_point = starting_point - self.gev_param_name_to_coef = None # type: Dict[str, LinearCoef] - super().__init__(coordinates, gev_param_name_to_dims, gev_param_name_to_coef) - - # @classmethod - # def from_coef_dict(cls, coordinates: AbstractCoordinates, gev_param_name_to_dims: Dict[str, List[int]], - # coef_dict: Dict[str, float]): - # return super().from_coef_dict(coordinates, gev_param_name_to_dims, coef_dict) + gev_param_name_to_coef: Dict[str, AbstractCoef], starting_point: Union[None, int] = None): + self.gev_param_name_to_coef = None # type: Union[None, Dict[str, LinearCoef]] + super().__init__(coordinates, gev_param_name_to_dims, gev_param_name_to_coef, starting_point) def load_specific_param_function(self, gev_param_name) -> AbstractParamFunction: return LinearParamFunction(dims=self.gev_param_name_to_dims[gev_param_name], coordinates=self.coordinates.coordinates_values(), linear_coef=self.gev_param_name_to_coef[gev_param_name]) - def get_gev_params(self, coordinate: np.ndarray) -> GevParams: - if self.starting_point is not None: - # Shift temporal coordinate to enable to model temporal trend with starting point - assert self.coordinates.has_temporal_coordinates - assert 0 <= self.coordinates.idx_temporal_coordinates < len(coordinate) - if coordinate[self.coordinates.idx_temporal_coordinates] < self.starting_point: - coordinate[self.coordinates.idx_temporal_coordinates] = self.starting_point - return super().get_gev_params(coordinate) - @classmethod def idx_to_coefficient_name(cls, coordinates: AbstractCoordinates) -> Dict[int, str]: # Intercept correspond to the dimension 0 @@ -68,6 +49,15 @@ class LinearMarginFunction(ParametricMarginFunction): def coefficient_name_to_dim(cls, coordinates: AbstractCoordinates) -> Dict[int, str]: return {v: k for k, v in cls.idx_to_coefficient_name(coordinates).items()} + @property + def coef_dict(self) -> Dict[str, float]: + coef_dict = {} + for gev_param_name in GevParams.PARAM_NAMES: + dims = self.gev_param_name_to_dims.get(gev_param_name, []) + coef = self.gev_param_name_to_coef[gev_param_name] + coef_dict.update(coef.coef_dict(dims, self.idx_to_coefficient_name(self.coordinates))) + return coef_dict + @property def form_dict(self) -> Dict[str, str]: form_dict = {} diff --git a/extreme_estimator/extreme_models/margin_model/margin_function/parametric_margin_function.py b/extreme_estimator/extreme_models/margin_model/margin_function/parametric_margin_function.py index 8a0e51ca..7476aa13 100644 --- a/extreme_estimator/extreme_models/margin_model/margin_function/parametric_margin_function.py +++ b/extreme_estimator/extreme_models/margin_model/margin_function/parametric_margin_function.py @@ -1,4 +1,6 @@ -from typing import Dict, List +from typing import Dict, List, Union + +import numpy as np from extreme_estimator.extreme_models.margin_model.margin_function.independent_margin_function import \ IndependentMarginFunction @@ -29,7 +31,9 @@ class ParametricMarginFunction(IndependentMarginFunction): COEF_CLASS = None def __init__(self, coordinates: AbstractCoordinates, gev_param_name_to_dims: Dict[str, List[int]], - gev_param_name_to_coef: Dict[str, AbstractCoef]): + gev_param_name_to_coef: Dict[str, AbstractCoef], starting_point: Union[None, int] = None): + # Starting point for the trend is the same for all the parameters + self.starting_point = starting_point super().__init__(coordinates) self.gev_param_name_to_dims = gev_param_name_to_dims # type: Dict[str, List[int]] @@ -57,9 +61,19 @@ class ParametricMarginFunction(IndependentMarginFunction): def load_specific_param_function(self, gev_param_name) -> AbstractParamFunction: raise NotImplementedError + def get_gev_params(self, coordinate: np.ndarray) -> GevParams: + print('here get gev', self.starting_point) + if self.starting_point is not None: + # Shift temporal coordinate to enable to model temporal trend with starting point + assert self.coordinates.has_temporal_coordinates + assert 0 <= self.coordinates.idx_temporal_coordinates < len(coordinate) + if coordinate[self.coordinates.idx_temporal_coordinates] < self.starting_point: + coordinate[self.coordinates.idx_temporal_coordinates] = self.starting_point + return super().get_gev_params(coordinate) + @classmethod def from_coef_dict(cls, coordinates: AbstractCoordinates, gev_param_name_to_dims: Dict[str, List[int]], - coef_dict: Dict[str, float]): + coef_dict: Dict[str, float], starting_point: Union[None, int] = None): assert cls.COEF_CLASS is not None, 'a COEF_CLASS class attributes needs to be defined' gev_param_name_to_coef = {} for gev_param_name in GevParams.PARAM_NAMES: @@ -67,16 +81,7 @@ class ParametricMarginFunction(IndependentMarginFunction): coef = cls.COEF_CLASS.from_coef_dict(coef_dict=coef_dict, gev_param_name=gev_param_name, dims=dims, coordinates=coordinates) gev_param_name_to_coef[gev_param_name] = coef - return cls(coordinates, gev_param_name_to_dims, gev_param_name_to_coef) - - @property - def coef_dict(self) -> Dict[str, float]: - coef_dict = {} - for gev_param_name in GevParams.PARAM_NAMES: - dims = self.gev_param_name_to_dims.get(gev_param_name, []) - coef = self.gev_param_name_to_coef[gev_param_name] - coef_dict.update(coef.coef_dict(dims, self.idx_to_coefficient_name(self.coordinates))) - return coef_dict + return cls(coordinates, gev_param_name_to_dims, gev_param_name_to_coef, starting_point) @property def form_dict(self) -> Dict[str, str]: diff --git a/test/test_extreme_estimator/test_margin_fits/test_gev/test_gev_temporal_margin.py b/test/test_extreme_estimator/test_margin_fits/test_gev/test_gev_temporal_margin.py new file mode 100644 index 00000000..176d0a03 --- /dev/null +++ b/test/test_extreme_estimator/test_margin_fits/test_gev/test_gev_temporal_margin.py @@ -0,0 +1,77 @@ +import unittest + +import numpy as np +import pandas as pd + +from extreme_estimator.estimator.margin_estimator.abstract_margin_estimator import LinearMarginEstimator +from extreme_estimator.extreme_models.margin_model.temporal_linear_margin_model import StationaryStationModel, \ + NonStationaryStationModel +from extreme_estimator.extreme_models.utils import r, set_seed_r +from extreme_estimator.margin_fits.gev.gevmle_fit import GevMleFit +from extreme_estimator.margin_fits.gev.ismev_gev_fit import IsmevGevFit +from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates +from spatio_temporal_dataset.coordinates.temporal_coordinates.abstract_temporal_coordinates import \ + AbstractTemporalCoordinates +from spatio_temporal_dataset.dataset.abstract_dataset import AbstractDataset +from spatio_temporal_dataset.spatio_temporal_observations.abstract_spatio_temporal_observations import \ + AbstractSpatioTemporalObservations + + +class TestGevTemporalMargin(unittest.TestCase): + + def setUp(self) -> None: + set_seed_r() + r(""" + N <- 50 + loc = 0; scale = 1; shape <- 1 + x_gev <- rgev(N, loc = loc, scale = scale, shape = shape) + start_loc = 0; start_scale = 1; start_shape = 1 + """) + # Compute the stationary temporal margin with isMev + df = pd.DataFrame({AbstractCoordinates.COORDINATE_T: range(50)}) + self.coordinates = AbstractTemporalCoordinates.from_df(df) + df2 = pd.DataFrame(data=np.array(r['x_gev']), index=df.index) + observations = AbstractSpatioTemporalObservations(df_maxima_gev=df2) + self.dataset = AbstractDataset(observations=observations, coordinates=self.coordinates) + + def test_gev_temporal_margin_fit_stationary(self): + # Create estimator + margin_model = StationaryStationModel(self.coordinates) + estimator = LinearMarginEstimator(self.dataset, margin_model) + estimator.fit() + ref = {'loc': 0.0219, 'scale': 1.0347, 'shape': 0.8295} + for year in range(1, 3): + mle_params_estimated = estimator.margin_function_fitted.get_gev_params(np.array([year])).to_dict() + for key in ref.keys(): + self.assertAlmostEqual(ref[key], mle_params_estimated[key], places=3) + + def test_gev_temporal_margin_fit_nonstationary(self): + # Create estimator + margin_model = NonStationaryStationModel(self.coordinates) + estimator = LinearMarginEstimator(self.dataset, margin_model) + estimator.fit() + self.assertNotEqual(estimator.result_from_fit.margin_coef_dict['tempCoeffLoc1'], 0.0) + # Checks that parameters returned are indeed different + mle_params_estimated_year1 = estimator.margin_function_fitted.get_gev_params(np.array([1])).to_dict() + mle_params_estimated_year3 = estimator.margin_function_fitted.get_gev_params(np.array([3])).to_dict() + self.assertNotEqual(mle_params_estimated_year1, mle_params_estimated_year3) + + def test_gev_temporal_margin_fit_nonstationary_with_start_point(self): + # Create estimator + margin_model = NonStationaryStationModel(self.coordinates, starting_point=3) + estimator = LinearMarginEstimator(self.dataset, margin_model) + estimator.fit() + self.assertNotEqual(estimator.result_from_fit.margin_coef_dict['tempCoeffLoc1'], 0.0) + # Checks starting point parameter are well passed + self.assertEqual(3, estimator.margin_function_fitted.starting_point) + # Checks that parameters returned are indeed different + mle_params_estimated_year1 = estimator.margin_function_fitted.get_gev_params(np.array([1])).to_dict() + mle_params_estimated_year3 = estimator.margin_function_fitted.get_gev_params(np.array([3])).to_dict() + self.assertEqual(mle_params_estimated_year1, mle_params_estimated_year3) + mle_params_estimated_year5 = estimator.margin_function_fitted.get_gev_params(np.array([5])).to_dict() + self.assertNotEqual(mle_params_estimated_year5, mle_params_estimated_year3) + # todo: create same test with a starting value, to check if the starting value is taken into account in the margin_function_fitted + + +if __name__ == '__main__': + unittest.main() -- GitLab