Commit 873cb9a4 authored by Le Roux Erwan's avatar Le Roux Erwan
Browse files

[SCM][NON STATIONARITY] add test_margin_temporal. fix issue in spatio_temporal_coordinates

parent d8718ec5
No related merge requests found
Showing with 125 additions and 132 deletions
+125 -132
...@@ -111,9 +111,9 @@ def complete_analysis(only_first_one=False): ...@@ -111,9 +111,9 @@ def complete_analysis(only_first_one=False):
def trend_analysis(): def trend_analysis():
save_to_file = False save_to_file = True
only_first_one = True only_first_one = False
altitudes = [300, 1200, 2100, 3000][3:] altitudes = [300, 1200, 2100, 3000][:]
study_classes = [CrocusSwe, CrocusDepth, SafranSnowfall, SafranRainfall, SafranTemperature] study_classes = [CrocusSwe, CrocusDepth, SafranSnowfall, SafranRainfall, SafranTemperature]
for study in study_iterator_global(study_classes, only_first_one=only_first_one, altitudes=altitudes): for study in study_iterator_global(study_classes, only_first_one=only_first_one, altitudes=altitudes):
study_visualizer = StudyVisualizer(study, save_to_file=save_to_file) study_visualizer = StudyVisualizer(study, save_to_file=save_to_file)
......
...@@ -84,7 +84,7 @@ class AbstractCoordinates(object): ...@@ -84,7 +84,7 @@ class AbstractCoordinates(object):
@classmethod @classmethod
def from_df_and_slicer(cls, df: pd.DataFrame, slicer_class: type, train_split_ratio: float = None): def from_df_and_slicer(cls, df: pd.DataFrame, slicer_class: type, train_split_ratio: float = None):
# All the index should be unique # All the index should be unique
assert len(set(df.index)) == len(df) assert len(set(df.index)) == len(df), 'df indices are not unique'
# Create a spatial split # Create a spatial split
s_split_spatial = s_split_from_df(df, cls.COORDINATE_X, cls.SPATIAL_SPLIT, train_split_ratio, True) s_split_spatial = s_split_from_df(df, cls.COORDINATE_X, cls.SPATIAL_SPLIT, train_split_ratio, True)
......
...@@ -23,8 +23,8 @@ class AbstractSpatioTemporalCoordinates(AbstractCoordinates): ...@@ -23,8 +23,8 @@ class AbstractSpatioTemporalCoordinates(AbstractCoordinates):
for t in range(nb_steps): for t in range(nb_steps):
df_time_step = df_spatial.copy() df_time_step = df_spatial.copy()
df_time_step[cls.COORDINATE_T] = start + t df_time_step[cls.COORDINATE_T] = start + t
index_suffix = index_type(t + len(df_spatial)) index_suffix = index_type(t * len(df_spatial))
time_step_index = [i + index_suffix for i in df_spatial.index] if t > 0 else df_spatial.index time_step_index = [i + index_suffix for i in df_spatial.index]
df_time_step.index = time_step_index df_time_step.index = time_step_index
df_time_steps.append(df_time_step) df_time_steps.append(df_time_step)
df_time_steps = pd.concat(df_time_steps) df_time_steps = pd.concat(df_time_steps)
......
...@@ -10,10 +10,11 @@ from experiment.meteo_france_SCM_study.safran.safran import SafranSnowfall, Exte ...@@ -10,10 +10,11 @@ from experiment.meteo_france_SCM_study.safran.safran import SafranSnowfall, Exte
from experiment.meteo_france_SCM_study.visualization.study_visualization.study_visualizer import StudyVisualizer from experiment.meteo_france_SCM_study.visualization.study_visualization.study_visualizer import StudyVisualizer
from test.test_utils import load_scm_studies from test.test_utils import load_scm_studies
class TestSCMAllStudy(unittest.TestCase): class TestSCMAllStudy(unittest.TestCase):
def test_extended_run(self): def test_extended_run(self):
for study_class in [ExtendedSafranSnowfall, ExtendedCrocusSwe]: for study_class in [ExtendedSafranSnowfall]:
for study in study_iterator(study_class, only_first_one=True, both_altitude=False, verbose=False): for study in study_iterator(study_class, only_first_one=True, both_altitude=False, verbose=False):
study_visualizer = StudyVisualizer(study, show=False, save_to_file=False) study_visualizer = StudyVisualizer(study, show=False, save_to_file=False)
study_visualizer.visualize_all_mean_and_max_graphs() study_visualizer.visualize_all_mean_and_max_graphs()
......
import unittest
import os
import os.path as op
from collections import OrderedDict
from typing import List, Dict
import matplotlib.pyplot as plt
import pandas as pd
from netCDF4 import Dataset
from experiment.meteo_france_SCM_study.abstract_variable import AbstractVariable
from experiment.meteo_france_SCM_study.massif import safran_massif_names_from_datasets
from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates
from spatio_temporal_dataset.coordinates.spatial_coordinates.abstract_spatial_coordinates import \
AbstractSpatialCoordinates
from spatio_temporal_dataset.spatio_temporal_observations.annual_maxima_observations import AnnualMaxima
from utils import get_full_path, cached_property
#
# from test.test_utils import load_safran_objects
#
#
# class TestFullEstimators(unittest.TestCase):
#
# def test_gev_mle_per_massif(self):
# safran_1800_one_day = load_safran_objects()[0]
# df = safran_1800_one_day.df_gev_mle_each_massif
# self.assertAlmostEqual(df.values.sum(), 1131.4551665871832)
#
#
# if __name__ == '__main__':
# unittest.main()
# import unittest
#
# from experiment.meteo_france_SCM_study.safran import Safran
# from experiment.meteo_france_SCM_study.safran_visualizer import SafranVisualizer
#
#
# class TestSafranVisualizer(unittest.TestCase):
# DISPLAY = False
#
# def setUp(self):
# super().setUp()
# self.safran = Safran(1800, 1)
# self.safran_visualizer = SafranVisualizer(self.safran, show=self.DISPLAY)
#
# def tearDown(self) -> None:
# self.assertTrue(True)
#
# def test_safran_smooth_margin_estimator(self):
# self.safran_visualizer.visualize_smooth_margin_fit()
#
# def test_safran_independent_margin_fits(self):
# self.safran_visualizer.visualize_independent_margin_fits()
#
# def test_safran_full_estimator(self):
# self.safran_visualizer.visualize_full_fit()
#
#
# if __name__ == '__main__':
# unittest.main()
# import unittest
#
# from experiment.simulation.lin_space_simulation import LinSpaceSimulation
#
#
# class TestSimulation(unittest.TestCase):
# DISPLAY = False
#
# def test_lin_space(self):
# s = LinSpaceSimulation()
# s.fit(show=self.DISPLAY)
#
# if __name__ == '__main__':
# unittest.main()
# import unittest
#
#
# from experiment.simulation.abstract_simulation import AbstractSimulation
# from extreme_estimator.extreme_models.margin_model.smooth_margin_model import ConstantMarginModel, \
# LinearAllParametersAllDimsMarginModel
# from extreme_estimator.extreme_models.max_stable_model.max_stable_models import Smith
# from extreme_estimator.gev_params import GevParams
# from spatio_temporal_dataset.coordinates.spatial_coordinates.coordinates_1D import LinSpaceSpatialCoordinates
# from spatio_temporal_dataset.dataset.simulation_dataset import FullSimulatedDataset
#
#
# class TestSplitCurve(unittest.TestCase):
# DISPLAY = False
#
# class SplitCurveFastForTest(AbstractSimulation):
#
# def __init__(self, nb_fit: int = 1):
# super().__init__(nb_fit)
# self.nb_points = 50
# self.nb_obs = 60
# self.coordinates = LinSpaceSpatialCoordinates.from_nb_points(nb_points=self.nb_points, train_split_ratio=0.8)
# # MarginModel Linear with respect to the shape (from 0.01 to 0.02)
# params_sample = {
# (GevParams.GEV_LOC, 0): 10,
# (GevParams.GEV_SHAPE, 0): 1.0,
# (GevParams.GEV_SCALE, 0): 1.0,
# }
# self.margin_model = ConstantMarginModel(coordinates=self.coordinates, params_sample=params_sample)
# self.max_stable_model = Smith()
#
# def load_dataset(self):
# return FullSimulatedDataset.from_double_sampling(nb_obs=self.nb_obs, margin_model=self.margin_model,
# coordinates=self.coordinates,
# max_stable_model=self.max_stable_model)
#
# def test_split_curve(self):
# s = self.SplitCurveFastForTest(nb_fit=2)
# s.fit(show=self.DISPLAY)
#
#
# if __name__ == '__main__':
# unittest.main()
import unittest
import numpy as np
import pandas as pd
from extreme_estimator.estimator.margin_estimator.abstract_margin_estimator import LinearMarginEstimator
from extreme_estimator.extreme_models.margin_model.linear_margin_model import LinearNonStationaryLocationMarginModel
from extreme_estimator.extreme_models.margin_model.temporal_linear_margin_model import StationaryStationModel, \
NonStationaryStationModel
from extreme_estimator.extreme_models.utils import r, set_seed_r
from extreme_estimator.margin_fits.gev.gevmle_fit import GevMleFit
from extreme_estimator.margin_fits.gev.ismev_gev_fit import IsmevGevFit
from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates
from spatio_temporal_dataset.coordinates.temporal_coordinates.abstract_temporal_coordinates import \
AbstractTemporalCoordinates
from spatio_temporal_dataset.dataset.abstract_dataset import AbstractDataset
from spatio_temporal_dataset.dataset.simulation_dataset import MarginDataset
from spatio_temporal_dataset.spatio_temporal_observations.abstract_spatio_temporal_observations import \
AbstractSpatioTemporalObservations
from test.test_utils import load_test_spatiotemporal_coordinates, load_smooth_margin_models
class TestMarginTemporal(unittest.TestCase):
def setUp(self) -> None:
self.nb_points = 2
self.nb_steps = 5
self.nb_obs = 1
# Load some 2D spatial coordinates
self.coordinates = load_test_spatiotemporal_coordinates(nb_steps=self.nb_steps, nb_points=self.nb_points)[1]
self.start_year = 2
smooth_margin_models = LinearNonStationaryLocationMarginModel(coordinates=self.coordinates,
starting_point=self.start_year)
self.dataset = MarginDataset.from_sampling(nb_obs=self.nb_obs,
margin_model=smooth_margin_models,
coordinates=self.coordinates)
def test_loading_dataset(self):
self.assertTrue(True)
# def test_gev_temporal_margin_fit_stationary(self):
# # Create estimator
# margin_model = StationaryStationModel(self.coordinates)
# estimator = LinearMarginEstimator(self.dataset, margin_model)
# estimator.fit()
# ref = {'loc': 0.0219, 'scale': 1.0347, 'shape': 0.8295}
# for year in range(1, 3):
# mle_params_estimated = estimator.margin_function_fitted.get_gev_params(np.array([year])).to_dict()
# for key in ref.keys():
# self.assertAlmostEqual(ref[key], mle_params_estimated[key], places=3)
#
# def test_gev_temporal_margin_fit_nonstationary(self):
# # Create estimator
# margin_model = NonStationaryStationModel(self.coordinates)
# estimator = LinearMarginEstimator(self.dataset, margin_model)
# estimator.fit()
# self.assertNotEqual(estimator.margin_function_fitted.mu1_temporal_trend, 0.0)
# # Checks that parameters returned are indeed different
# mle_params_estimated_year1 = estimator.margin_function_fitted.get_gev_params(np.array([1])).to_dict()
# mle_params_estimated_year3 = estimator.margin_function_fitted.get_gev_params(np.array([3])).to_dict()
# self.assertNotEqual(mle_params_estimated_year1, mle_params_estimated_year3)
#
# def test_gev_temporal_margin_fit_nonstationary_with_start_point(self):
# # Create estimator
# estimator = self.fit_non_stationary_estimator(starting_point=3)
# self.assertNotEqual(estimator.margin_function_fitted.mu1_temporal_trend, 0.0)
# # Checks starting point parameter are well passed
# self.assertEqual(3, estimator.margin_function_fitted.starting_point)
# # Checks that parameters returned are indeed different
# mle_params_estimated_year1 = estimator.margin_function_fitted.get_gev_params(np.array([1])).to_dict()
# mle_params_estimated_year3 = estimator.margin_function_fitted.get_gev_params(np.array([3])).to_dict()
# self.assertEqual(mle_params_estimated_year1, mle_params_estimated_year3)
# mle_params_estimated_year5 = estimator.margin_function_fitted.get_gev_params(np.array([5])).to_dict()
# self.assertNotEqual(mle_params_estimated_year5, mle_params_estimated_year3)
#
# def fit_non_stationary_estimator(self, starting_point):
# margin_model = NonStationaryStationModel(self.coordinates, starting_point=starting_point + self.start_year)
# estimator = LinearMarginEstimator(self.dataset, margin_model)
# estimator.fit()
# return estimator
#
# def test_two_different_starting_points(self):
# # Create two different estimators
# estimator1 = self.fit_non_stationary_estimator(starting_point=3)
# estimator2 = self.fit_non_stationary_estimator(starting_point=28)
# mu1_estimator1 = estimator1.margin_function_fitted.mu1_temporal_trend
# mu1_estimator2 = estimator2.margin_function_fitted.mu1_temporal_trend
# print(mu1_estimator1, mu1_estimator2)
# self.assertNotEqual(mu1_estimator1, mu1_estimator2)
if __name__ == '__main__':
unittest.main()
...@@ -17,7 +17,7 @@ from spatio_temporal_dataset.spatio_temporal_observations.abstract_spatio_tempor ...@@ -17,7 +17,7 @@ from spatio_temporal_dataset.spatio_temporal_observations.abstract_spatio_tempor
AbstractSpatioTemporalObservations AbstractSpatioTemporalObservations
class TestGevTemporalMargin(unittest.TestCase): class TestGevTemporal(unittest.TestCase):
def setUp(self) -> None: def setUp(self) -> None:
set_seed_r() set_seed_r()
...@@ -28,7 +28,8 @@ class TestGevTemporalMargin(unittest.TestCase): ...@@ -28,7 +28,8 @@ class TestGevTemporalMargin(unittest.TestCase):
start_loc = 0; start_scale = 1; start_shape = 1 start_loc = 0; start_scale = 1; start_shape = 1
""") """)
# Compute the stationary temporal margin with isMev # Compute the stationary temporal margin with isMev
df = pd.DataFrame({AbstractCoordinates.COORDINATE_T: range(50)}) self.start_year = 0
df = pd.DataFrame({AbstractCoordinates.COORDINATE_T: range(self.start_year, self.start_year + 50)})
self.coordinates = AbstractTemporalCoordinates.from_df(df) self.coordinates = AbstractTemporalCoordinates.from_df(df)
df2 = pd.DataFrame(data=np.array(r['x_gev']), index=df.index) df2 = pd.DataFrame(data=np.array(r['x_gev']), index=df.index)
observations = AbstractSpatioTemporalObservations(df_maxima_gev=df2) observations = AbstractSpatioTemporalObservations(df_maxima_gev=df2)
...@@ -50,7 +51,7 @@ class TestGevTemporalMargin(unittest.TestCase): ...@@ -50,7 +51,7 @@ class TestGevTemporalMargin(unittest.TestCase):
margin_model = NonStationaryStationModel(self.coordinates) margin_model = NonStationaryStationModel(self.coordinates)
estimator = LinearMarginEstimator(self.dataset, margin_model) estimator = LinearMarginEstimator(self.dataset, margin_model)
estimator.fit() estimator.fit()
self.assertNotEqual(estimator.result_from_fit.margin_coef_dict['tempCoeffLoc1'], 0.0) self.assertNotEqual(estimator.margin_function_fitted.mu1_temporal_trend, 0.0)
# Checks that parameters returned are indeed different # Checks that parameters returned are indeed different
mle_params_estimated_year1 = estimator.margin_function_fitted.get_gev_params(np.array([1])).to_dict() mle_params_estimated_year1 = estimator.margin_function_fitted.get_gev_params(np.array([1])).to_dict()
mle_params_estimated_year3 = estimator.margin_function_fitted.get_gev_params(np.array([3])).to_dict() mle_params_estimated_year3 = estimator.margin_function_fitted.get_gev_params(np.array([3])).to_dict()
...@@ -58,10 +59,8 @@ class TestGevTemporalMargin(unittest.TestCase): ...@@ -58,10 +59,8 @@ class TestGevTemporalMargin(unittest.TestCase):
def test_gev_temporal_margin_fit_nonstationary_with_start_point(self): def test_gev_temporal_margin_fit_nonstationary_with_start_point(self):
# Create estimator # Create estimator
margin_model = NonStationaryStationModel(self.coordinates, starting_point=3) estimator = self.fit_non_stationary_estimator(starting_point=3)
estimator = LinearMarginEstimator(self.dataset, margin_model) self.assertNotEqual(estimator.margin_function_fitted.mu1_temporal_trend, 0.0)
estimator.fit()
self.assertNotEqual(estimator.result_from_fit.margin_coef_dict['tempCoeffLoc1'], 0.0)
# Checks starting point parameter are well passed # Checks starting point parameter are well passed
self.assertEqual(3, estimator.margin_function_fitted.starting_point) self.assertEqual(3, estimator.margin_function_fitted.starting_point)
# Checks that parameters returned are indeed different # Checks that parameters returned are indeed different
...@@ -70,7 +69,21 @@ class TestGevTemporalMargin(unittest.TestCase): ...@@ -70,7 +69,21 @@ class TestGevTemporalMargin(unittest.TestCase):
self.assertEqual(mle_params_estimated_year1, mle_params_estimated_year3) self.assertEqual(mle_params_estimated_year1, mle_params_estimated_year3)
mle_params_estimated_year5 = estimator.margin_function_fitted.get_gev_params(np.array([5])).to_dict() mle_params_estimated_year5 = estimator.margin_function_fitted.get_gev_params(np.array([5])).to_dict()
self.assertNotEqual(mle_params_estimated_year5, mle_params_estimated_year3) self.assertNotEqual(mle_params_estimated_year5, mle_params_estimated_year3)
# todo: create same test with a starting value, to check if the starting value is taken into account in the margin_function_fitted
def fit_non_stationary_estimator(self, starting_point):
margin_model = NonStationaryStationModel(self.coordinates, starting_point=starting_point + self.start_year)
estimator = LinearMarginEstimator(self.dataset, margin_model)
estimator.fit()
return estimator
def test_two_different_starting_points(self):
# Create two different estimators
estimator1 = self.fit_non_stationary_estimator(starting_point=3)
estimator2 = self.fit_non_stationary_estimator(starting_point=28)
mu1_estimator1 = estimator1.margin_function_fitted.mu1_temporal_trend
mu1_estimator2 = estimator2.margin_function_fitted.mu1_temporal_trend
print(mu1_estimator1, mu1_estimator2)
self.assertNotEqual(mu1_estimator1, mu1_estimator2)
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -65,6 +65,9 @@ class SpatioTemporalCoordinates(unittest.TestCase): ...@@ -65,6 +65,9 @@ class SpatioTemporalCoordinates(unittest.TestCase):
# the uniqueness of each spatio temporal index is not garanteed by the current algo # the uniqueness of each spatio temporal index is not garanteed by the current algo
# it will work in classical cases, and raise an assert when uniqueness is needed (when using a slicer) # it will work in classical cases, and raise an assert when uniqueness is needed (when using a slicer)
index1 = pd.Series(spatial_coordinates.spatial_index()) index1 = pd.Series(spatial_coordinates.spatial_index())
# Add the suffix to the index1
suffix = '0' if isinstance(df_spatial.index[0], str) else 0
index1 += suffix
index2 = pd.Series(coordinates.spatial_index()) index2 = pd.Series(coordinates.spatial_index())
ind = index1 != index2 # type: pd.Series ind = index1 != index2 # type: pd.Series
self.assertEqual(sum(ind), 0, msg="spatial_coordinates:\n{} \n!= spatio_temporal_coordinates \n{}". self.assertEqual(sum(ind), 0, msg="spatial_coordinates:\n{} \n!= spatio_temporal_coordinates \n{}".
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment