import unittest from itertools import product import numpy as np from extreme_fit.model.margin_model.linear_margin_model.linear_margin_model import LinearNonStationaryLocationMarginModel from extreme_fit.model.utils import set_seed_for_test, SafeRunException from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates from spatio_temporal_dataset.coordinates.spatial_coordinates.abstract_spatial_coordinates import \ AbstractSpatialCoordinates from spatio_temporal_dataset.coordinates.spatio_temporal_coordinates.abstract_spatio_temporal_coordinates import \ AbstractSpatioTemporalCoordinates from spatio_temporal_dataset.coordinates.temporal_coordinates.generated_temporal_coordinates import \ ConsecutiveTemporalCoordinates from spatio_temporal_dataset.coordinates.transformed_coordinates.transformation.uniform_normalization import \ BetweenZeroAndOneNormalization from spatio_temporal_dataset.dataset.abstract_dataset import AbstractDataset from spatio_temporal_dataset.dataset.simulation_dataset import MaxStableDataset, MarginDataset from spatio_temporal_dataset.slicer.split import Split from spatio_temporal_dataset.spatio_temporal_observations.annual_maxima_observations import AnnualMaxima from test.test_utils import load_test_max_stable_models, load_test_3D_spatial_coordinates, \ load_test_1D_and_2D_spatial_coordinates, load_test_spatiotemporal_coordinates class TestDataset(unittest.TestCase): nb_obs = 2 nb_points = 2 def test_remove_zero_from_dataset(self): coordinates, dataset_initial, observations = self.build_initial_dataset() dataset_without_zero = AbstractDataset.remove_zeros(observations, coordinates) self.assertEqual(len(dataset_initial.coordinates), 4) self.assertEqual(len(dataset_without_zero.coordinates), 2) def test_remove_top_maxima_from_dataset(self): coordinates, dataset_initial, observations = self.build_initial_dataset() dataset_without_top = AbstractDataset.remove_top_maxima(observations, coordinates) self.assertEqual(4, len(dataset_initial.coordinates), 4) self.assertEqual(2, len(dataset_without_top.coordinates)) self.assertEqual(set(dataset_without_top.coordinates.index.values), {0, 3}) def build_initial_dataset(self): temporal_coordinates = ConsecutiveTemporalCoordinates.from_nb_temporal_steps(nb_temporal_steps=2) spatial_coordinates = AbstractSpatialCoordinates.from_list_x_coordinates([300, 600]) coordinates = AbstractSpatioTemporalCoordinates(spatial_coordinates=spatial_coordinates, temporal_coordinates=temporal_coordinates) coordinate_values_to_maxima = { (300, 0): [0], (300, 1): [1], (600, 0): [2], (600, 1): [0], } observations = AnnualMaxima.from_coordinates(coordinates, coordinate_values_to_maxima) dataset_initial = AbstractDataset(observations, coordinates) return coordinates, dataset_initial, observations def test_max_stable_dataset_R1_and_R2(self): max_stable_models = load_test_max_stable_models()[:] coordinates = load_test_1D_and_2D_spatial_coordinates(self.nb_points) for coordinates, max_stable_model in product(coordinates, max_stable_models): dataset = MaxStableDataset.from_sampling(nb_obs=self.nb_obs, max_stable_model=max_stable_model, coordinates=coordinates) assert len(dataset.df_dataset.columns) == self.nb_obs + dataset.coordinates.nb_coordinates self.assertTrue(True) def test_max_stable_dataset_crash_R3(self): """Test to warn me when spatialExtremes handles R3""" with self.assertRaises(SafeRunException): smith_process = load_test_max_stable_models()[0] coordinates = load_test_3D_spatial_coordinates(nb_points=self.nb_points)[0] MaxStableDataset.from_sampling(nb_obs=self.nb_obs, max_stable_model=smith_process, coordinates=coordinates) def test_max_stable_dataset_R3_with_R2_spatial_structure(self): """Test to create a dataset in R3, but where the spatial structure is only with respect to the 2 fisrt coordinates""" coordinates = \ load_test_3D_spatial_coordinates(nb_points=self.nb_points, transformation_class=BetweenZeroAndOneNormalization)[ 0] smith_process = load_test_max_stable_models()[0] MaxStableDataset.from_sampling(nb_obs=self.nb_obs, max_stable_model=smith_process, coordinates=coordinates, use_rmaxstab_with_2_coordinates=True) class TestSpatioTemporalDataset(unittest.TestCase): nb_obs = 2 nb_points = 3 nb_steps = 2 def setUp(self) -> None: set_seed_for_test(seed=42) self.coordinates = load_test_spatiotemporal_coordinates(nb_steps=self.nb_steps, nb_points=self.nb_points)[1] def load_dataset(self, nb_obs): smooth_margin_model = LinearNonStationaryLocationMarginModel(coordinates=self.coordinates, starting_point=1) self.dataset = MarginDataset.from_sampling(nb_obs=nb_obs, margin_model=smooth_margin_model, coordinates=self.coordinates) print(self.dataset.__str__()) def test_spatio_temporal_array_wrt_time(self): # The test could have been on a given station. But we decided to do it for a given time step. self.load_dataset(nb_obs=1) # Load observation for time 0 ind_time_0 = self.dataset.coordinates.ind_of_df_all_coordinates( coordinate_name=AbstractCoordinates.COORDINATE_T, value=0) observation_at_time_0_v1 = self.dataset.observations.df_maxima_gev.loc[ind_time_0].values.flatten() # Load observation correspond to time 0 maxima_gev = self.dataset.maxima_gev_for_spatial_extremes_package() maxima_gev = np.transpose(maxima_gev) self.assertEqual(maxima_gev.shape, (3, 2)) observation_at_time_0_v2 = maxima_gev[:, 0] equality = np.equal(observation_at_time_0_v1, observation_at_time_0_v2).all() self.assertTrue(equality, msg='v1={} is different from v2={}'.format(observation_at_time_0_v1, observation_at_time_0_v2)) def test_spatio_temporal_array_wrt_space(self): # The test could have been on a given station. But we decided to do it for a given time step. self.load_dataset(nb_obs=1) # Load observation for time 0 ind_station_0 = self.dataset.coordinates.ind_of_df_all_coordinates( coordinate_name=AbstractCoordinates.COORDINATE_X, value=-1) observation_at_station_0_v1 = self.dataset.observations.df_maxima_gev.loc[ind_station_0].values.flatten() # Load observation correspond to time 0 maxima_gev = self.dataset.maxima_gev_for_spatial_extremes_package() maxima_gev = np.transpose(maxima_gev) self.assertEqual(maxima_gev.shape, (3, 2)) observation_at_time_0_v2 = maxima_gev[0, :] equality = np.equal(observation_at_station_0_v1, observation_at_time_0_v2).all() self.assertTrue(equality, msg='v1={} is different from v2={}'.format(observation_at_station_0_v1, observation_at_time_0_v2)) def test_spatio_temporal_array_with_multiple_observations(self): # In this case, we must check that the observations are the same self.load_dataset(nb_obs=2) # Load observation for time 0 ind_station_0 = self.dataset.coordinates.ind_of_df_all_coordinates( coordinate_name=AbstractCoordinates.COORDINATE_X, value=-1) observation_at_station_0_v1 = self.dataset.observations.df_maxima_gev.loc[ind_station_0].values.flatten() # Load observation correspond to time 0 maxima_gev = self.dataset.maxima_gev_for_spatial_extremes_package() maxima_gev = np.transpose(maxima_gev) self.assertEqual(maxima_gev.shape, (3, 2 * 2)) observation_at_station_0_v2 = maxima_gev[0, :] self.assertEqual(len(observation_at_station_0_v2), 4, msg='{}'.format(observation_at_station_0_v2)) # The order does not really matter here but we check it anyway self.assertTrue(np.equal(observation_at_station_0_v1, observation_at_station_0_v2).all(), msg='v1={} is different from v2={}'.format(observation_at_station_0_v1, observation_at_station_0_v2)) if __name__ == '__main__': unittest.main()