Commit 6172bedc authored by Le Roux Erwan's avatar Le Roux Erwan
Browse files

[projection snowfall] add the 27 non stationary spline models of interest.

parent c85b1f9c
No related merge requests found
Showing with 289 additions and 73 deletions
+289 -73
......@@ -4,45 +4,62 @@ from extreme_fit.function.param_function.polynomial_coef import PolynomialCoef,
from extreme_fit.function.param_function.spline_coef import SplineCoef, SplineAllCoef
def load_param_name_to_polynomial_all_coef(param_name_to_list_dim_and_degree,
param_name_and_dim_and_degree_to_default_coef):
param_name_to_polynomial_all_coef = {}
param_names = list(set([e[0] for e in param_name_and_dim_and_degree_to_default_coef.keys()]))
for param_name in param_names:
dim_to_polynomial_coef = {}
for dim, max_degree in param_name_to_list_dim_and_degree.get(param_name, []):
degree_to_coef = {}
for (param_name_loop, dim_loop, degree), coef in param_name_and_dim_and_degree_to_default_coef.items():
if param_name == param_name_loop and dim == dim_loop and degree <= max_degree:
degree_to_coef[degree] = coef
polynomial_coef = PolynomialCoef(param_name, degree_to_coef=degree_to_coef)
dim_to_polynomial_coef[dim] = polynomial_coef
if len(dim_to_polynomial_coef) == 0:
intercept = param_name_and_dim_and_degree_to_default_coef[(param_name, 0, 0)]
dim_to_polynomial_coef = None
else:
intercept = None
polynomial_all_coef = PolynomialAllCoef(param_name=param_name,
dim_to_polynomial_coef=dim_to_polynomial_coef,
intercept=intercept)
param_name_to_polynomial_all_coef[param_name] = polynomial_all_coef
return param_name_to_polynomial_all_coef
def load_polynomial_all_coef(param_name, param_name_and_dim_and_degree_to_default_coef,
param_name_to_list_dim_and_degree):
dim_to_polynomial_coef = {}
for dim, max_degree in param_name_to_list_dim_and_degree.get(param_name, []):
degree_to_coef = {}
for (param_name_loop, dim_loop, degree), coef in param_name_and_dim_and_degree_to_default_coef.items():
if param_name == param_name_loop and dim == dim_loop and degree <= max_degree:
degree_to_coef[degree] = coef
polynomial_coef = PolynomialCoef(param_name, degree_to_coef=degree_to_coef)
dim_to_polynomial_coef[dim] = polynomial_coef
if len(dim_to_polynomial_coef) == 0:
intercept = param_name_and_dim_and_degree_to_default_coef[(param_name, 0, 0)]
dim_to_polynomial_coef = None
else:
intercept = None
polynomial_all_coef = PolynomialAllCoef(param_name=param_name,
dim_to_polynomial_coef=dim_to_polynomial_coef,
intercept=intercept)
return polynomial_all_coef
def load_spline_all_coef(param_name, param_name_to_list_dim_and_degree_and_nb_intervals):
dim_to_spline_coef = {}
for dim, max_degree, nb_intervals in param_name_to_list_dim_and_degree_and_nb_intervals.get(param_name, []):
nb_coefficients = nb_intervals + 1
coefficients = np.arange(nb_coefficients)
knots = np.arange(nb_coefficients + max_degree + 1)
dim_to_spline_coef[dim] = SplineCoef(param_name, knots=knots, coefficients=coefficients)
if len(dim_to_spline_coef) == 0:
dim_to_spline_coef = None
spline_all_coef = SplineAllCoef(param_name=param_name,
dim_to_spline_coef=dim_to_spline_coef)
return spline_all_coef
def load_param_name_to_spline_all_coef(param_name_to_list_dim_and_degree_and_nb_intervals,
param_name_and_dim_and_degree_to_default_coef):
param_name_and_dim_and_degree_to_default_coef):
param_name_to_spline_all_coef = {}
param_names = list(set([e[0] for e in param_name_and_dim_and_degree_to_default_coef.keys()]))
for param_name in param_names:
dim_to_spline_coef = {}
for dim, max_degree, nb_intervals in param_name_to_list_dim_and_degree_and_nb_intervals.get(param_name, []):
nb_coefficients = nb_intervals + 1
coefficients = np.arange(nb_coefficients)
knots = np.arange(nb_coefficients + max_degree + 1)
dim_to_spline_coef[dim] = SplineCoef(param_name, knots=knots, coefficients=coefficients)
if len(dim_to_spline_coef) == 0:
dim_to_spline_coef = None
spline_all_coef = SplineAllCoef(param_name=param_name,
dim_to_spline_coef=dim_to_spline_coef)
param_name_to_spline_all_coef[param_name] = spline_all_coef
list_dim_and_degree_and_nb_intervals = param_name_to_list_dim_and_degree_and_nb_intervals.get(param_name, [])
if (len(list_dim_and_degree_and_nb_intervals) > 0) and (len(list_dim_and_degree_and_nb_intervals[0]) == 3):
all_coef = load_spline_all_coef(param_name, param_name_to_list_dim_and_degree_and_nb_intervals)
else:
all_coef = load_polynomial_all_coef(param_name, param_name_and_dim_and_degree_to_default_coef,
param_name_to_list_dim_and_degree_and_nb_intervals)
param_name_to_spline_all_coef[param_name] = all_coef
return param_name_to_spline_all_coef
def load_param_name_to_polynomial_all_coef(param_name_to_list_dim_and_degree,
param_name_and_dim_and_degree_to_default_coef):
param_name_to_polynomial_all_coef = {}
param_names = list(set([e[0] for e in param_name_and_dim_and_degree_to_default_coef.keys()]))
for param_name in param_names:
polynomial_all_coef = load_polynomial_all_coef(param_name, param_name_and_dim_and_degree_to_default_coef,
param_name_to_list_dim_and_degree)
param_name_to_polynomial_all_coef[param_name] = polynomial_all_coef
return param_name_to_polynomial_all_coef
......@@ -32,7 +32,7 @@ class SplineMarginModel(AbstractTemporalLinearMarginModel):
# Assert the order of list of dim and degree, to match the order of the form dict,
# i.e. 1) spatial individual terms 2) combined terms 3) temporal individual terms
for param_name, list_dim_and_degree_and_nb_intervals in param_name_to_list_dim_and_degree_and_nb_intervals.items():
dims = [d for d, m, nb in list_dim_and_degree_and_nb_intervals]
dims = [d for d, *_ in list_dim_and_degree_and_nb_intervals]
assert all([isinstance(d, int) or isinstance(d, tuple) for d in dims])
if self.coordinates.has_spatial_coordinates and self.coordinates.idx_x_coordinates in dims:
assert dims.index(self.coordinates.idx_x_coordinates) == 0
......@@ -40,7 +40,8 @@ class SplineMarginModel(AbstractTemporalLinearMarginModel):
assert dims.index(self.coordinates.idx_temporal_coordinates) == len(dims) - 1
# Assert that the degree are inferior to the max degree
for list_dim_and_degree_and_nb_intervals in param_name_to_list_dim_and_degree_and_nb_intervals.values():
for _, max_degree, _ in list_dim_and_degree_and_nb_intervals:
for _, *max_degree in list_dim_and_degree_and_nb_intervals:
max_degree = max_degree if isinstance(max_degree, float) else max_degree[0]
assert max_degree <= self.max_degree, 'Max degree (={}) specified is too high'.format(max_degree)
# Load param_name_to_spline_all_coef
param_name_to_spline_all_coef = load_param_name_to_spline_all_coef(
......
......@@ -28,17 +28,157 @@ class NonStationaryTwoLinearShapeModel(SplineMarginModel):
class NonStationaryTwoLinearLocationOneLinearScaleModel(SplineMarginModel):
def load_margin_function(self, param_name_to_dims=None):
# Degree 1, Two Linear sections for the location
return super().load_margin_function({
GevParams.LOC: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
GevParams.SCALE: [(self.coordinates.idx_temporal_coordinates, 1)]
})
class NonStationaryTwoLinearLocationOneLinearShapeModel(SplineMarginModel):
def load_margin_function(self, param_name_to_dims=None):
return super().load_margin_function({
GevParams.LOC: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
GevParams.SHAPE: [(self.coordinates.idx_temporal_coordinates, 1)]
})
class NonStationaryTwoLinearLocationScaleModel(SplineMarginModel):
class NonStationaryTwoLinearScaleOneLinearLocModel(SplineMarginModel):
def load_margin_function(self, param_name_to_dims=None):
# Degree 1, Two Linear sections for the scale parameters
return super().load_margin_function({GevParams.SCALE: [(self.coordinates.idx_temporal_coordinates, 1, 2)]})
return super().load_margin_function({
GevParams.SCALE: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
GevParams.LOC: [(self.coordinates.idx_temporal_coordinates, 1)]
})
class NonStationaryTwoLinearScaleOneLinearShapeModel(SplineMarginModel):
def load_margin_function(self, param_name_to_dims=None):
return super().load_margin_function({
GevParams.SCALE: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
GevParams.SHAPE: [(self.coordinates.idx_temporal_coordinates, 1)]
})
class NonStationaryTwoLinearShapeOneLinearLocModel(SplineMarginModel):
def load_margin_function(self, param_name_to_dims=None):
return super().load_margin_function({
GevParams.SHAPE: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
GevParams.LOC: [(self.coordinates.idx_temporal_coordinates, 1)]
})
class NonStationaryTwoLinearShapeOneLinearScaleModel(SplineMarginModel):
def load_margin_function(self, param_name_to_dims=None):
return super().load_margin_function({
GevParams.SHAPE: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
GevParams.SCALE: [(self.coordinates.idx_temporal_coordinates, 1)]
})
# Two Linearity with two one linearity
class NonStationaryTwoLinearLocationOneLinearScaleAndShapeModel(SplineMarginModel):
def load_margin_function(self, param_name_to_dims=None):
return super().load_margin_function({
GevParams.LOC: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
GevParams.SCALE: [(self.coordinates.idx_temporal_coordinates, 1)],
GevParams.SHAPE: [(self.coordinates.idx_temporal_coordinates, 1)]
})
class NonStationaryTwoLinearScaleOneLinearLocAndShapeModel(SplineMarginModel):
def load_margin_function(self, param_name_to_dims=None):
return super().load_margin_function({
GevParams.SCALE: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
GevParams.LOC: [(self.coordinates.idx_temporal_coordinates, 1)],
GevParams.SHAPE: [(self.coordinates.idx_temporal_coordinates, 1)]
})
class NonStationaryTwoLinearShapeOneLinearLocAndScaleModel(SplineMarginModel):
def load_margin_function(self, param_name_to_dims=None):
return super().load_margin_function({
GevParams.SHAPE: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
GevParams.LOC: [(self.coordinates.idx_temporal_coordinates, 1)],
GevParams.SCALE: [(self.coordinates.idx_temporal_coordinates, 1)]
})
# two linearity two times
class NonStationaryTwoLinearLocationAndScaleModel(SplineMarginModel):
def load_margin_function(self, param_name_to_dims=None):
return super().load_margin_function({
GevParams.LOC: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
GevParams.SCALE: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
})
class NonStationaryTwoLinearScaleAndShapeModel(SplineMarginModel):
def load_margin_function(self, param_name_to_dims=None):
return super().load_margin_function({
GevParams.SCALE: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
GevParams.SHAPE: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
})
class NonStationaryTwoLinearLocationAndShape(SplineMarginModel):
def load_margin_function(self, param_name_to_dims=None):
return super().load_margin_function({
GevParams.LOC: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
GevParams.SHAPE: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
})
# two linearity two times and one linearity
class NonStationaryTwoLinearLocationAndScaleOneLinearShapeModel(SplineMarginModel):
def load_margin_function(self, param_name_to_dims=None):
return super().load_margin_function({
GevParams.LOC: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
GevParams.SCALE: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
GevParams.SHAPE: [(self.coordinates.idx_temporal_coordinates, 1)],
})
class NonStationaryTwoLinearScaleAndShapeOneLinearLocModel(SplineMarginModel):
def load_margin_function(self, param_name_to_dims=None):
return super().load_margin_function({
GevParams.SCALE: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
GevParams.SHAPE: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
GevParams.LOC: [(self.coordinates.idx_temporal_coordinates, 1)],
})
class NonStationaryTwoLinearLocationAndShapeOneLinearScaleModel(SplineMarginModel):
def load_margin_function(self, param_name_to_dims=None):
return super().load_margin_function({
GevParams.LOC: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
GevParams.SHAPE: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
GevParams.SCALE: [(self.coordinates.idx_temporal_coordinates, 1)],
})
# two linearity three times
class NonStationaryTwoLinearLocationAndScaleAndShapeModel(SplineMarginModel):
def load_margin_function(self, param_name_to_dims=None):
return super().load_margin_function({
GevParams.LOC: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
GevParams.SHAPE: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
GevParams.SCALE: [(self.coordinates.idx_temporal_coordinates, 1, 2)],
})
......@@ -84,15 +84,22 @@ class VisualizerForProjectionEnsemble(object):
ensemble_class_to_ensemble_fit[ensemble_fit_class] = ensemble_fit
self.altitude_class_to_ensemble_class_to_ensemble_fit[altitude_class] = ensemble_class_to_ensemble_fit
@property
def has_elevation_non_stationarity(self):
return all([len(a) > 1 for a in self.altitudes_list])
def plot_for_visualizer_list(self, visualizer_list):
with_significance = False
for v in visualizer_list:
v.plot_moments()
# plot_histogram_all_trends_against_altitudes(self.massif_names, visualizer_list,
# with_significance=with_significance)
# for relative in [True, False]:
# plot_shoe_plot_changes_against_altitude(self.massif_names, visualizer_list, relative=relative,
# with_significance=with_significance)
if self.has_elevation_non_stationarity:
with_significance = False
for v in visualizer_list:
v.plot_moments()
plot_histogram_all_trends_against_altitudes(self.massif_names, visualizer_list,
with_significance=with_significance)
for relative in [True, False]:
plot_shoe_plot_changes_against_altitude(self.massif_names, visualizer_list, relative=relative,
with_significance=with_significance)
else:
print('here plot for one altitude visualizer')
def plot(self):
# Set limit for the plot
......
from extreme_fit.model.margin_model.linear_margin_model.temporal_linear_margin_models import StationaryTemporalModel, \
NonStationaryLocationTemporalModel, NonStationaryScaleTemporalModel, NonStationaryShapeTemporalModel
NonStationaryLocationTemporalModel, NonStationaryScaleTemporalModel, NonStationaryShapeTemporalModel, \
NonStationaryScaleAndShapeTemporalModel, NonStationaryLocationAndScaleAndShapeTemporalModel, \
NonStationaryLocationAndShapeTemporalModel, NonStationaryLocationAndScaleTemporalModel
from extreme_fit.model.margin_model.spline_margin_model.temporal_spline_model_degree_1 import \
NonStationaryTwoLinearLocationModel, NonStationaryTwoLinearShapeModel
NonStationaryTwoLinearLocationModel, NonStationaryTwoLinearShapeModel, \
NonStationaryTwoLinearShapeOneLinearScaleModel, NonStationaryTwoLinearScaleOneLinearShapeModel, \
NonStationaryTwoLinearScaleAndShapeModel, NonStationaryTwoLinearShapeOneLinearLocAndScaleModel, \
NonStationaryTwoLinearScaleOneLinearLocAndShapeModel, NonStationaryTwoLinearShapeOneLinearLocModel, \
NonStationaryTwoLinearScaleOneLinearLocModel, NonStationaryTwoLinearScaleAndShapeOneLinearLocModel, \
NonStationaryTwoLinearLocationOneLinearScaleModel, NonStationaryTwoLinearLocationOneLinearScaleAndShapeModel, \
NonStationaryTwoLinearLocationOneLinearShapeModel, NonStationaryTwoLinearScaleModel, \
NonStationaryTwoLinearLocationAndShapeOneLinearScaleModel, NonStationaryTwoLinearLocationAndScaleAndShapeModel, \
NonStationaryTwoLinearLocationAndScaleOneLinearShapeModel, NonStationaryTwoLinearLocationAndScaleModel, \
NonStationaryTwoLinearLocationAndShape
SPLINE_MODELS_FOR_PROJECTION_ONE_ALTITUDE = [
# 1 model with three parameters
StationaryTemporalModel,
# 3 models with four parameters
NonStationaryLocationTemporalModel,
# Models with a constant Location parameter
StationaryTemporalModel,
# Simple linearity for the others
NonStationaryScaleTemporalModel,
NonStationaryShapeTemporalModel,
NonStationaryScaleAndShapeTemporalModel,
# Double linearity for the others
NonStationaryTwoLinearScaleModel,
NonStationaryTwoLinearShapeModel,
NonStationaryTwoLinearShapeOneLinearScaleModel,
NonStationaryTwoLinearScaleOneLinearShapeModel,
NonStationaryTwoLinearScaleAndShapeModel,
# Models with a linear location parameter
NonStationaryLocationTemporalModel,
# Simple linearity for the others
NonStationaryLocationAndScaleTemporalModel,
NonStationaryLocationAndShapeTemporalModel,
NonStationaryLocationAndScaleAndShapeTemporalModel,
# Double linearity for the others
NonStationaryTwoLinearScaleOneLinearLocModel,
NonStationaryTwoLinearShapeOneLinearLocModel,
NonStationaryTwoLinearScaleOneLinearLocAndShapeModel,
NonStationaryTwoLinearShapeOneLinearLocAndScaleModel,
NonStationaryTwoLinearScaleAndShapeOneLinearLocModel,
# Location only non-stationarity
# Models with linear location parameter with double linearity
NonStationaryTwoLinearLocationModel,
# Simple linearity for the others
NonStationaryTwoLinearLocationOneLinearScaleModel,
NonStationaryTwoLinearLocationOneLinearShapeModel,
NonStationaryTwoLinearLocationOneLinearScaleAndShapeModel,
# Double Linearity for the others
NonStationaryTwoLinearLocationAndScaleModel,
NonStationaryTwoLinearLocationAndScaleOneLinearShapeModel,
NonStationaryTwoLinearLocationAndShape,
NonStationaryTwoLinearLocationAndShapeOneLinearScaleModel,
NonStationaryTwoLinearLocationAndScaleAndShapeModel,
# Scale only non-stationarity
NonStationaryTwoLinearShapeModel,
]
# Shape only non-stationarity
#
]
if __name__ == '__main__':
print(len(set(SPLINE_MODELS_FOR_PROJECTION_ONE_ALTITUDE)))
\ No newline at end of file
......@@ -9,7 +9,7 @@ from extreme_fit.model.margin_model.polynomial_margin_model.polynomial_margin_mo
NonStationaryQuadraticScaleModel, NonStationaryQuadraticLocationGumbelModel, NonStationaryQuadraticScaleGumbelModel
from extreme_fit.model.margin_model.spline_margin_model.temporal_spline_model_degree_1 import \
NonStationaryTwoLinearLocationModel, NonStationaryTwoLinearShapeModel, \
NonStationaryTwoLinearLocationOneLinearScaleModel
NonStationaryTwoLinearLocationOneLinearScaleModel, NonStationaryTwoLinearLocationAndScaleAndShapeModel
from extreme_trend.trend_test.abstract_gev_trend_test import fitted_linear_margin_estimator
from extreme_fit.model.margin_model.utils import \
MarginFitMethod
......@@ -33,9 +33,10 @@ class TestGevTemporalSpline(unittest.TestCase):
start_loc = 0; start_scale = 1; start_shape = 1
""")
# Compute the stationary temporal margin with isMev
self.start_year = -25
self.start_year = 0
nb_years = 51
self.last_year = self.start_year + nb_years - 1
self.middle_year = (self.last_year + self.start_year) / 2
years = np.array(range(self.start_year, self.start_year + nb_years))
df = pd.DataFrame({AbstractCoordinates.COORDINATE_T: years})
self.coordinates = AbstractTemporalCoordinates.from_df(df)
......@@ -51,14 +52,20 @@ class TestGevTemporalSpline(unittest.TestCase):
starting_year=None,
fit_method=self.fit_method)
# Checks that parameters returned are indeed different
mle_params_estimated_year1 = estimator.function_from_fit.get_params(np.array([0])).to_dict()
mle_params_estimated_year3 = estimator.function_from_fit.get_params(np.array([21])).to_dict()
mle_params_estimated_year5 = estimator.function_from_fit.get_params(np.array([self.last_year])).to_dict()
self.assertNotEqual(mle_params_estimated_year1, mle_params_estimated_year3)
self.assertNotEqual(mle_params_estimated_year3, mle_params_estimated_year5)
print(self.start_year, self.middle_year, self.last_year)
mle_params_estimated_year_first = estimator.function_from_fit.get_params(np.array([self.start_year])).to_dict()
mle_params_estimated_year_middle = estimator.function_from_fit.get_params(np.array([self.middle_year])).to_dict()
mle_params_estimated_year_last = estimator.function_from_fit.get_params(np.array([self.last_year])).to_dict()
self.assertNotEqual(mle_params_estimated_year_first, mle_params_estimated_year_middle)
self.assertNotEqual(mle_params_estimated_year_middle, mle_params_estimated_year_last)
# # Assert the relationship for the location is different between the beginning and the end
diff1 = mle_params_estimated_year1[param_to_test] - mle_params_estimated_year3[param_to_test]
diff2 = mle_params_estimated_year3[param_to_test] - mle_params_estimated_year5[param_to_test]
if param_to_test != GevParams.SCALE:
diff1 = mle_params_estimated_year_first[param_to_test] - mle_params_estimated_year_middle[param_to_test]
diff2 = mle_params_estimated_year_middle[param_to_test] - mle_params_estimated_year_last[param_to_test]
else:
# because we use log scaling for the fit, so the relationship is w.r.t. the log of the parameter
diff1 = np.log(mle_params_estimated_year_first[param_to_test]) - np.log(mle_params_estimated_year_middle[param_to_test])
diff2 = np.log(mle_params_estimated_year_middle[param_to_test]) - np.log(mle_params_estimated_year_last[param_to_test])
self.assertNotAlmostEqual(diff1, diff2)
for idx, nb_year in enumerate(range(5)):
......@@ -82,9 +89,15 @@ class TestGevTemporalSpline(unittest.TestCase):
self.function_test_gev_temporal_margin_fit_non_stationary_spline(NonStationaryTwoLinearShapeModel,
param_to_test=GevParams.SCALE)
# def test_gev_temporal_margin_fit_spline_two_linear_location_with_added_linearity_in_scale(self):
# self.function_test_gev_temporal_margin_fit_non_stationary_spline(NonStationaryTwoLinearLocationOneLinearScaleModel,
# param_to_test=GevParams.SCALE)
def test_gev_temporal_margin_fit_spline_two_linear_location_with_added_linearity_in_scale(self):
self.function_test_gev_temporal_margin_fit_non_stationary_spline(NonStationaryTwoLinearLocationOneLinearScaleModel,
param_to_test=GevParams.LOC)
def test_gev_temporal_margin_fit_spline_two_linear_location_with_two_linearity_for_the_others(self):
self.function_test_gev_temporal_margin_fit_non_stationary_spline(NonStationaryTwoLinearLocationAndScaleAndShapeModel,
param_to_test=GevParams.SCALE)
if __name__ == '__main__':
unittest.main()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment