Commit b9fb5cbc authored by Le Roux Erwan's avatar Le Roux Erwan
Browse files

[TEST][DATASET] major fix for abstract_dataset for the function...

[TEST][DATASET] major fix for abstract_dataset for the function transform_maxima in the spatio temporal case
parent 9f2e2b58
No related merge requests found
Showing with 62 additions and 13 deletions
+62 -13
...@@ -215,6 +215,9 @@ class AbstractCoordinates(object): ...@@ -215,6 +215,9 @@ class AbstractCoordinates(object):
def spatio_temporal_shape(self, split: Split.all) -> Tuple[int, int]: def spatio_temporal_shape(self, split: Split.all) -> Tuple[int, int]:
return len(self.df_spatial_coordinates(split)), len(self.df_temporal_coordinates(split)) return len(self.df_spatial_coordinates(split)), len(self.df_temporal_coordinates(split))
def ind_of_df_all_coordinates(self, coordinate_name, value):
return self.df_all_coordinates.loc[:, coordinate_name] == value
# Visualization # Visualization
@property @property
......
...@@ -56,8 +56,9 @@ class AbstractDataset(object): ...@@ -56,8 +56,9 @@ class AbstractDataset(object):
def transform_maxima_for_spatial_extreme_package(self, maxima_function, split) -> np.ndarray: def transform_maxima_for_spatial_extreme_package(self, maxima_function, split) -> np.ndarray:
array = maxima_function(split) array = maxima_function(split)
if self.coordinates.has_spatio_temporal_coordinates: if self.coordinates.has_spatio_temporal_coordinates:
array = array.reshape(self.coordinates.spatio_temporal_shape(split)) return array.reshape(self.coordinates.spatio_temporal_shape(split)[::-1])
return np.transpose(array) else:
return np.transpose(array)
def maxima_gev_for_spatial_extremes_package(self, split: Split = Split.all) -> np.ndarray: def maxima_gev_for_spatial_extremes_package(self, split: Split = Split.all) -> np.ndarray:
return self.transform_maxima_for_spatial_extreme_package(self.maxima_gev, split) return self.transform_maxima_for_spatial_extreme_package(self.maxima_gev, split)
...@@ -105,7 +106,7 @@ class AbstractDataset(object): ...@@ -105,7 +106,7 @@ class AbstractDataset(object):
# Special methods # Special methods
def __str__(self) -> str: def __str__(self) -> str:
return 'coordinates: {}\nobservations: {}'.format(self.coordinates.__str__(), self.observations.__str__()) return 'coordinates:\n{}\nobservations:\n{}'.format(self.coordinates.__str__(), self.observations.__str__())
def get_subset_dataset(dataset: AbstractDataset, subset_id) -> AbstractDataset: def get_subset_dataset(dataset: AbstractDataset, subset_id) -> AbstractDataset:
......
...@@ -31,10 +31,10 @@ class TestMarginTemporal(unittest.TestCase): ...@@ -31,10 +31,10 @@ class TestMarginTemporal(unittest.TestCase):
self.nb_obs = 1 self.nb_obs = 1
# Load some 2D spatial coordinates # Load some 2D spatial coordinates
self.coordinates = load_test_spatiotemporal_coordinates(nb_steps=self.nb_steps, nb_points=self.nb_points)[1] self.coordinates = load_test_spatiotemporal_coordinates(nb_steps=self.nb_steps, nb_points=self.nb_points)[1]
smooth_margin_model = LinearNonStationaryLocationMarginModel(coordinates=self.coordinates, self.smooth_margin_model = LinearNonStationaryLocationMarginModel(coordinates=self.coordinates,
starting_point=2) starting_point=2)
self.dataset = MarginDataset.from_sampling(nb_obs=self.nb_obs, self.dataset = MarginDataset.from_sampling(nb_obs=self.nb_obs,
margin_model=smooth_margin_model, margin_model=self.smooth_margin_model,
coordinates=self.coordinates) coordinates=self.coordinates)
def test_margin_fit_stationary(self): def test_margin_fit_stationary(self):
...@@ -42,7 +42,7 @@ class TestMarginTemporal(unittest.TestCase): ...@@ -42,7 +42,7 @@ class TestMarginTemporal(unittest.TestCase):
margin_model = LinearStationaryMarginModel(self.coordinates) margin_model = LinearStationaryMarginModel(self.coordinates)
estimator = LinearMarginEstimator(self.dataset, margin_model) estimator = LinearMarginEstimator(self.dataset, margin_model)
estimator.fit() estimator.fit()
ref = {'loc': 1.3403346591679877, 'scale': 1.054867229157924, 'shape': 0.713700960727747} ref = {'loc': 1.3456595684773085, 'scale': 1.090369430386199, 'shape': 0.6845422250749476}
for year in range(1, 3): for year in range(1, 3):
coordinate = np.array([0.0, 0.0, year]) coordinate = np.array([0.0, 0.0, year])
mle_params_estimated = estimator.margin_function_fitted.get_gev_params(coordinate).to_dict() mle_params_estimated = estimator.margin_function_fitted.get_gev_params(coordinate).to_dict()
...@@ -65,8 +65,11 @@ class TestMarginTemporal(unittest.TestCase): ...@@ -65,8 +65,11 @@ class TestMarginTemporal(unittest.TestCase):
def test_margin_fit_nonstationary_with_start_point(self): def test_margin_fit_nonstationary_with_start_point(self):
# Create estimator # Create estimator
estimator = self.fit_non_stationary_estimator(starting_point=2) estimator = self.fit_non_stationary_estimator(starting_point=2)
print(estimator.margin_function_fitted.mu1_temporal_trend) # By default, estimator find the good margin
self.assertNotEqual(estimator.margin_function_fitted.mu1_temporal_trend, 0.0) self.assertNotEqual(estimator.margin_function_fitted.mu1_temporal_trend, 0.0)
self.assertAlmostEqual(estimator.margin_function_fitted.mu1_temporal_trend,
self.smooth_margin_model.margin_function_sample.mu1_temporal_trend,
places=3)
# Checks starting point parameter are well passed # Checks starting point parameter are well passed
self.assertEqual(2, estimator.margin_function_fitted.starting_point) self.assertEqual(2, estimator.margin_function_fitted.starting_point)
# Checks that parameters returned are indeed different # Checks that parameters returned are indeed different
......
from rpy2.rinterface import RRuntimeError
import unittest import unittest
from itertools import product from itertools import product
from spatio_temporal_dataset.dataset.simulation_dataset import MaxStableDataset import numpy as np
from test.test_utils import load_test_max_stable_models, load_test_spatial_coordinates, \ from rpy2.rinterface import RRuntimeError
load_test_3D_spatial_coordinates, \
load_test_1D_and_2D_spatial_coordinates from extreme_estimator.extreme_models.margin_model.linear_margin_model import LinearNonStationaryLocationMarginModel
from extreme_estimator.extreme_models.utils import set_seed_for_test
from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates
from spatio_temporal_dataset.dataset.simulation_dataset import MaxStableDataset, MarginDataset
from test.test_utils import load_test_max_stable_models, load_test_3D_spatial_coordinates, \
load_test_1D_and_2D_spatial_coordinates, load_test_spatiotemporal_coordinates
class TestDataset(unittest.TestCase): class TestDataset(unittest.TestCase):
...@@ -32,5 +36,43 @@ class TestDataset(unittest.TestCase): ...@@ -32,5 +36,43 @@ class TestDataset(unittest.TestCase):
coordinates=coordinates) coordinates=coordinates)
class TestSpatioTemporalDataset(unittest.TestCase):
nb_obs = 2
nb_points = 3
nb_steps = 2
def setUp(self) -> None:
set_seed_for_test(seed=42)
self.coordinates = load_test_spatiotemporal_coordinates(nb_steps=self.nb_steps, nb_points=self.nb_points)[1]
def load_dataset(self, nb_obs):
smooth_margin_model = LinearNonStationaryLocationMarginModel(coordinates=self.coordinates,
starting_point=1)
self.dataset = MarginDataset.from_sampling(nb_obs=nb_obs,
margin_model=smooth_margin_model,
coordinates=self.coordinates)
def test_spatio_temporal_array(self):
self.load_dataset(nb_obs=1)
# Load observation for time 0
ind_time_0 = self.dataset.coordinates.ind_of_df_all_coordinates(coordinate_name=AbstractCoordinates.COORDINATE_T,
value=0)
observation_at_time_0_v1 = self.dataset.observations.df_maxima_gev.loc[ind_time_0].values.flatten()
# Load observation correspond to time 0
maxima_gev = self.dataset.maxima_gev_for_spatial_extremes_package()
maxima_gev = np.transpose(maxima_gev)
self.assertEqual(maxima_gev.shape, (3, 2))
observation_at_time_0_v2 = maxima_gev[:, 0]
equality = np.equal(observation_at_time_0_v1, observation_at_time_0_v2).all()
self.assertTrue(equality, msg='v1={} is different from v2={}'.format(observation_at_time_0_v1,
observation_at_time_0_v2))
def test_spatio_temporal_case_to_resolve(self):
self.load_dataset(nb_obs=2)
print(self.dataset.maxima_gev_for_spatial_extremes_package())
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment