Commit 199d9f66 authored by Le Roux Erwan's avatar Le Roux Erwan
Browse files

[SLICER] improve slicer and split. add test_slicer

parent a1a75f01
No related merge requests found
Showing with 526 additions and 147 deletions
+526 -147
import time
from spatio_temporal_dataset.dataset.abstract_dataset import AbstractDataset
from spatio_temporal_dataset.spatio_temporal_split import SpatialTemporalSplit
from spatio_temporal_dataset.slicer.split import Split
class AbstractEstimator(object):
......@@ -15,7 +15,10 @@ class AbstractEstimator(object):
def __init__(self, dataset: AbstractDataset):
self.dataset = dataset # type: AbstractDataset
self.additional_information = dict()
self.train_split = SpatialTemporalSplit.train
@property
def train_split(self):
return self.dataset.train_split
def fit(self):
ts = time.time()
......
import matplotlib.cm as cm
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from extreme_estimator.gev_params import GevParams
from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates
from spatio_temporal_dataset.spatio_temporal_split import SpatialTemporalSplit
from spatio_temporal_dataset.slicer.split import Split
class AbstractMarginFunction(object):
......@@ -16,13 +17,24 @@ class AbstractMarginFunction(object):
# Visualization parameters
self.visualization_axes = None
self.datapoint_display = False
self.spatio_temporal_split = SpatialTemporalSplit.all
self.spatio_temporal_split = Split.all
self.datapoint_marker = 'o'
def get_gev_params(self, coordinate: np.ndarray) -> GevParams:
"""Main method that maps each coordinate to its GEV parameters"""
pass
# Extraction function
@property
def gev_params_for_coordinates(self):
gev_params = [self.get_gev_params(coordinate).to_dict() for coordinate in self.coordinates.coordinates_values()]
gev_param_name_to_serie = {}
for gev_param_name in GevParams.GEV_PARAM_NAMES:
s = pd.Series(data=[p[gev_param_name] for p in gev_params], index=self.coordinates.index)
gev_param_name_to_serie[gev_param_name] = s
return gev_param_name_to_serie
# Visualization function
def set_datapoint_display_parameters(self, spatio_temporal_split, datapoint_marker):
......
import numpy as np
from extreme_estimator.extreme_models.margin_model.margin_function.abstract_margin_function import \
AbstractMarginFunction
from extreme_estimator.gev_params import GevParams
def abs_error(s1, s2):
return (s1 - s2).abs().pow(2)
def error_dict_between_margin_functions(margin1: AbstractMarginFunction, margin2: AbstractMarginFunction):
assert margin1.coordinates == margin2.coordinates
margin1_gev_params, margin2_gev_params = margin1.gev_params_for_coordinates, margin2.gev_params_for_coordinates
gev_param_name_to_error_serie = {}
for gev_param_name in GevParams.GEV_PARAM_NAMES:
serie1, serie2 = margin1_gev_params[gev_param_name], margin2_gev_params[gev_param_name]
error = abs_error(serie1, serie2)
gev_param_name_to_error_serie[gev_param_name] = error
return gev_param_name_to_error_serie
......@@ -34,7 +34,7 @@ class AbstractMaxStableModel(AbstractModel):
maxima = maxima_gev if fit_marge else maxima_frech
assert isinstance(maxima, np.ndarray)
assert len(df_coordinates) == len(maxima), 'Coordinates and observations sizes should match,' \
'check that the same split was used for both objects \n,' \
'check that the same split was used for both objects, \n' \
'df_coordinates size: {}, data size {}'.format(len(df_coordinates),
len(maxima))
data = np.transpose(maxima)
......
......@@ -6,8 +6,11 @@ import numpy as np
import pandas as pd
from mpl_toolkits.mplot3d import Axes3D
from spatio_temporal_dataset.spatio_temporal_split import s_split_from_ratio, TEST_SPLIT_STR, \
TRAIN_SPLIT_STR, train_ind_from_s_split, SpatialTemporalSplit
from spatio_temporal_dataset.slicer.spatial_slicer import SpatialSlicer
from spatio_temporal_dataset.slicer.spatio_temporal_slicer import SpatioTemporalSlicer
from spatio_temporal_dataset.slicer.split import s_split_from_ratio, TEST_SPLIT_STR, \
TRAIN_SPLIT_STR, train_ind_from_s_split, Split
from spatio_temporal_dataset.slicer.temporal_slicer import TemporalSlicer
class AbstractCoordinates(object):
......@@ -31,7 +34,7 @@ class AbstractCoordinates(object):
# Create a split based on the train_split_ratio
if train_split_ratio is not None:
assert cls.COORDINATE_SPLIT not in df.columns, "A split has already been defined"
s_split = s_split_from_ratio(length=len(df), train_split_ratio=train_split_ratio)
s_split = s_split_from_ratio(index=df.index, train_split_ratio=train_split_ratio)
df[cls.COORDINATE_SPLIT] = s_split
# Potentially, a split column can be specified directly in df
if cls.COORDINATE_SPLIT not in df.columns:
......@@ -91,15 +94,21 @@ class AbstractCoordinates(object):
# Merged DataFrame of df_coord and s_split
return self.df_coord if self.s_split is None else self.df_coord.join(self.s_split)
def df_coordinates(self, split: SpatialTemporalSplit = SpatialTemporalSplit.all) -> pd.DataFrame:
if split is SpatialTemporalSplit.all or self.s_split is None:
def df_coordinates(self, split: Split = Split.all) -> pd.DataFrame:
if self.train_ind is None:
return self.df_coord
elif split in [SpatialTemporalSplit.train, SpatialTemporalSplit.test_temporal]:
if split is Split.all:
return self.df_coord
if split in [Split.train_temporal, Split.test_temporal]:
return self.df_coord
elif split in [Split.train_spatial, Split.train_spatiotemporal, Split.test_spatiotemporal_temporal]:
return self.df_coord.loc[self.train_ind]
else:
elif split in [Split.test_spatial, Split.test_spatiotemporal, Split.test_spatiotemporal_spatial]:
return self.df_coord.loc[~self.train_ind]
else:
raise NotImplementedError('Unknown split: {}'.format(split))
def coordinates_values(self, split: SpatialTemporalSplit = SpatialTemporalSplit.all) -> np.ndarray:
def coordinates_values(self, split: Split = Split.all) -> np.ndarray:
return self.df_coordinates(split).values
@property
......
import os
import numpy as np
import os.path as op
from typing import List
import numpy as np
import pandas as pd
from spatio_temporal_dataset.spatio_temporal_split import SpatialTemporalSplit, SpatioTemporalSlicer
from spatio_temporal_dataset.spatio_temporal_observations.abstract_spatio_temporal_observations import AbstractSpatioTemporalObservations
from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates
from spatio_temporal_dataset.slicer.abstract_slicer import AbstractSlicer
from spatio_temporal_dataset.slicer.spatial_slicer import SpatialSlicer
from spatio_temporal_dataset.slicer.split import Split
from spatio_temporal_dataset.spatio_temporal_observations.abstract_spatio_temporal_observations import \
AbstractSpatioTemporalObservations
class AbstractDataset(object):
def __init__(self, observations: AbstractSpatioTemporalObservations, coordinates: AbstractCoordinates):
# is_same_index = spatio_temporal_observations.index == coordinates.index # type: pd.Series
# assert is_same_index.all()
def __init__(self, observations: AbstractSpatioTemporalObservations, coordinates: AbstractCoordinates,
slicer_class: type = SpatialSlicer):
assert pd.Index.equals(observations.index, coordinates.index)
assert isinstance(slicer_class, type)
self.observations = observations
self.coordinates = coordinates
self.spatio_temporal_slicer = SpatioTemporalSlicer(coordinates_train_ind=self.coordinates.train_ind,
observations_train_ind=self.observations.train_ind)
self.slicer = slicer_class(coordinates_train_ind=self.coordinates.train_ind,
observations_train_ind=self.observations.train_ind) # type: AbstractSlicer
assert isinstance(self.slicer, AbstractSlicer)
@classmethod
def from_csv(cls, csv_path: str):
......@@ -38,18 +45,36 @@ class AbstractDataset(object):
# todo: maybe I should add the split from the temporal observations
return self.observations.df_maxima_gev.join(self.coordinates.df_merged)
def df_coordinates(self, split: SpatialTemporalSplit = SpatialTemporalSplit.all):
def df_coordinates(self, split: Split = Split.all) -> pd.DataFrame:
return self.coordinates.df_coordinates(split=split)
# Observation wrapper
def maxima_gev(self, split: Split = Split.all) -> np.ndarray:
return self.observations.maxima_gev(split, self.slicer)
def maxima_frech(self, split: Split = Split.all) -> np.ndarray:
return self.observations.maxima_frech(split, self.slicer)
def set_maxima_frech(self, maxima_frech_values: np.ndarray, split: Split = Split.all):
self.observations.set_maxima_frech(maxima_frech_values, split, self.slicer)
# Coordinates wrapper
@property
def coordinates_values(self, split: SpatialTemporalSplit = SpatialTemporalSplit.all):
def coordinates_values(self, split: Split = Split.all) -> np.ndarray:
return self.coordinates.coordinates_values(split=split)
def maxima_gev(self, split: SpatialTemporalSplit = SpatialTemporalSplit.all) -> np.ndarray:
return self.observations.maxima_gev(split, self.spatio_temporal_slicer)
# Slicer wrapper
@property
def train_split(self) -> Split:
return self.slicer.train_split
def maxima_frech(self, split: SpatialTemporalSplit = SpatialTemporalSplit.all) -> np.ndarray:
return self.observations.maxima_frech(split, self.spatio_temporal_slicer)
@property
def test_split(self) -> Split:
return self.slicer.test_split
def set_maxima_frech(self, maxima_frech_values: np.ndarray, split: SpatialTemporalSplit = SpatialTemporalSplit.all):
self.observations.set_maxima_frech(maxima_frech_values, split, self.spatio_temporal_slicer)
\ No newline at end of file
@property
def splits(self) -> List[Split]:
return self.slicer.splits
from extreme_estimator.extreme_models.margin_model.abstract_margin_model import AbstractMarginModel
from extreme_estimator.extreme_models.max_stable_model.abstract_max_stable_model import AbstractMaxStableModel
from spatio_temporal_dataset.dataset.abstract_dataset import AbstractDataset
from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates
from spatio_temporal_dataset.spatio_temporal_observations.abstract_spatio_temporal_observations import AbstractSpatioTemporalObservations
from spatio_temporal_dataset.dataset.abstract_dataset import AbstractDataset
from spatio_temporal_dataset.slicer.spatial_slicer import SpatialSlicer
from spatio_temporal_dataset.spatio_temporal_observations.abstract_spatio_temporal_observations import \
AbstractSpatioTemporalObservations
from spatio_temporal_dataset.spatio_temporal_observations.annual_maxima_observations import \
MaxStableAnnualMaxima, AnnualMaxima, MarginAnnualMaxima, FullAnnualMaxima
MaxStableAnnualMaxima, MarginAnnualMaxima, FullAnnualMaxima
class SimulatedDataset(AbstractDataset):
......@@ -15,30 +17,33 @@ class SimulatedDataset(AbstractDataset):
def __init__(self, observations: AbstractSpatioTemporalObservations,
coordinates: AbstractCoordinates,
slicer_class: type = SpatialSlicer,
max_stable_model: AbstractMaxStableModel = None,
margin_model: AbstractMarginModel = None):
super().__init__(observations, coordinates)
super().__init__(observations, coordinates, slicer_class)
assert margin_model is not None or max_stable_model is not None
self.margin_model = margin_model # type: AbstractMarginModel
self.max_stable_model = max_stable_model # type: AbstractMaxStableModel
self.max_stable_model = max_stable_model # type: AbstractMaxStableModel
class MaxStableDataset(SimulatedDataset):
@classmethod
def from_sampling(cls, nb_obs: int, max_stable_model: AbstractMaxStableModel, coordinates: AbstractCoordinates,
train_split_ratio: float = None):
train_split_ratio: float = None, slicer_class: type = SpatialSlicer):
observations = MaxStableAnnualMaxima.from_sampling(nb_obs, max_stable_model, coordinates, train_split_ratio)
return cls(observations=observations, coordinates=coordinates, max_stable_model=max_stable_model)
return cls(observations=observations, coordinates=coordinates, slicer_class=slicer_class,
max_stable_model=max_stable_model)
class MarginDataset(SimulatedDataset):
@classmethod
def from_sampling(cls, nb_obs: int, margin_model: AbstractMarginModel, coordinates: AbstractCoordinates,
train_split_ratio: float = None):
train_split_ratio: float = None, slicer_class: type = SpatialSlicer):
observations = MarginAnnualMaxima.from_sampling(nb_obs, coordinates, margin_model, train_split_ratio)
return cls(observations=observations, coordinates=coordinates, margin_model=margin_model)
return cls(observations=observations, coordinates=coordinates, slicer_class=slicer_class,
margin_model=margin_model)
class FullSimulatedDataset(SimulatedDataset):
......@@ -47,8 +52,9 @@ class FullSimulatedDataset(SimulatedDataset):
def from_double_sampling(cls, nb_obs: int, max_stable_model: AbstractMaxStableModel,
coordinates: AbstractCoordinates,
margin_model: AbstractMarginModel,
train_split_ratio: float = None):
train_split_ratio: float = None,
slicer_class: type = SpatialSlicer):
observations = FullAnnualMaxima.from_double_sampling(nb_obs, max_stable_model,
coordinates, margin_model, train_split_ratio)
return cls(observations=observations, coordinates=coordinates, max_stable_model=max_stable_model,
margin_model=margin_model)
return cls(observations=observations, coordinates=coordinates, slicer_class=slicer_class,
max_stable_model=max_stable_model, margin_model=margin_model)
from typing import Union, List
import pandas as pd
from spatio_temporal_dataset.slicer.split import Split
class AbstractSlicer(object):
def __init__(self, coordinates_train_ind: Union[None, pd.Series], observations_train_ind: Union[None, pd.Series]):
self.index_train_ind = coordinates_train_ind # type: Union[None, pd.Series]
self.column_train_ind = observations_train_ind # type: Union[None, pd.Series]
@property
def train_split(self) -> Split:
pass
@property
def test_split(self) -> Split:
pass
@property
def splits(self) -> List[Split]:
pass
@property
def index_test_ind(self) -> pd.Series:
return ~self.index_train_ind
# todo: test should be the same as train when we don't care about that in the split
@property
def column_test_ind(self) -> pd.Series:
return ~self.column_train_ind
@property
def some_required_ind_are_not_defined(self):
pass
def summary(self):
print('Slicer summary: \n')
for s, global_name in [(self.index_train_ind, "Spatial"), (self.column_train_ind, "Temporal")]:
print(global_name + ' split')
if s is None:
print('Not handled by this slicer')
else:
for f, name in [(len, 'Total'), (sum, 'train')]:
print("{}: {}".format(name, f(s)))
print('\n')
def loc_split(self, df: pd.DataFrame, split: Split):
# split should belong to the list of split accepted by the slicer
assert isinstance(split, Split)
if split is Split.all:
return df
assert split in self.splits, "split:{}, slicer_type:{}".format(split, type(self))
# By default, some required splits are not defined
# instead of crashing, we return all the data for all the split
# This is the default behavior, when the required splits has been defined
if self.some_required_ind_are_not_defined:
return df
else:
return self.specialized_loc_split(df=df, split=split)
def specialized_loc_split(self, df: pd.DataFrame, split: Split):
# This method should be defined in the child class
return None
def slice(df: pd.DataFrame, split: Split = Split.all, slicer: AbstractSlicer = None) -> pd.DataFrame:
if slicer is None:
assert split is Split.all
return df
else:
return slicer.loc_split(df, split)
from typing import List, Union
import pandas as pd
from spatio_temporal_dataset.slicer.abstract_slicer import AbstractSlicer
from spatio_temporal_dataset.slicer.split import Split
class SpatialSlicer(AbstractSlicer):
SPLITS = [Split.train_spatial, Split.test_spatial]
def __init__(self, coordinates_train_ind: Union[None, pd.Series], observations_train_ind: Union[None, pd.Series]):
super().__init__(coordinates_train_ind, None)
@property
def splits(self) -> List[Split]:
return self.SPLITS
@property
def train_split(self) -> Split:
return Split.train_spatial
@property
def test_split(self) -> Split:
return Split.test_spatial
@property
def some_required_ind_are_not_defined(self):
return self.index_train_ind is None
def specialized_loc_split(self, df: pd.DataFrame, split: Split):
assert pd.Index.equals(df.index, self.index_train_ind.index)
if split is Split.train_spatial:
return df.loc[self.index_train_ind, :]
elif split is Split.test_spatial:
return df.loc[self.index_test_ind, :]
from typing import List
import pandas as pd
from spatio_temporal_dataset.slicer.abstract_slicer import AbstractSlicer
from spatio_temporal_dataset.slicer.split import Split
class SpatioTemporalSlicer(AbstractSlicer):
SPLITS = [Split.train_spatiotemporal,
Split.test_spatiotemporal,
Split.test_spatiotemporal_spatial,
Split.test_spatiotemporal_temporal]
@property
def splits(self) -> List[Split]:
return self.SPLITS
@property
def train_split(self) -> Split:
return Split.train_spatiotemporal
@property
def test_split(self) -> Split:
return Split.test_spatiotemporal
@property
def some_required_ind_are_not_defined(self):
return self.index_train_ind is None or self.column_train_ind is None
def specialized_loc_split(self, df: pd.DataFrame, split: Split):
assert pd.Index.equals(df.columns, self.column_train_ind.index)
assert pd.Index.equals(df.index, self.index_train_ind.index)
if split is Split.train_spatiotemporal:
return df.loc[self.index_train_ind, self.column_train_ind]
elif split is Split.test_spatiotemporal:
return df.loc[self.index_test_ind, self.column_test_ind]
elif split is Split.test_spatiotemporal_spatial:
return df.loc[self.index_test_ind, self.column_train_ind]
elif split is Split.test_spatiotemporal_temporal:
return df.loc[self.index_train_ind, self.column_test_ind]
from enum import Enum
import pandas as pd
class Split(Enum):
all = 0
# SpatioTemporal splits
train_spatiotemporal = 1
test_spatiotemporal = 2
test_spatiotemporal_spatial = 3
test_spatiotemporal_temporal = 4
# Spatial splits
train_spatial = 5
test_spatial = 6
# Temporal splits
train_temporal = 7
test_temporal = 8
ALL_SPLITS_EXCEPT_ALL = [split for split in Split if split is not Split.all]
SPLIT_NAME = 'split'
TRAIN_SPLIT_STR = 'train_split'
TEST_SPLIT_STR = 'test_split'
def train_ind_from_s_split(s_split):
if s_split is None:
return None
else:
return s_split.isin([TRAIN_SPLIT_STR])
def s_split_from_ratio(index, train_split_ratio):
length = len(index)
assert 0 < train_split_ratio < 1
s = pd.Series(TEST_SPLIT_STR, index=index)
nb_points_train = int(length * train_split_ratio)
assert 0 < nb_points_train < length
train_ind = pd.Series.sample(s, n=nb_points_train).index
assert 0 < len(train_ind) < length, "number of training points:{} length:{}".format(len(train_ind), length)
s.loc[train_ind] = TRAIN_SPLIT_STR
return s
from typing import List, Union
import pandas as pd
from spatio_temporal_dataset.slicer.abstract_slicer import AbstractSlicer
from spatio_temporal_dataset.slicer.split import Split
class TemporalSlicer(AbstractSlicer):
SPLITS = [Split.train_temporal, Split.test_temporal]
def __init__(self, coordinates_train_ind: Union[None, pd.Series], observations_train_ind: Union[None, pd.Series]):
super().__init__(None, observations_train_ind)
@property
def splits(self) -> List[Split]:
return self.SPLITS
@property
def train_split(self) -> Split:
return Split.train_temporal
@property
def test_split(self) -> Split:
return Split.test_temporal
@property
def some_required_ind_are_not_defined(self):
return self.column_train_ind is None
def specialized_loc_split(self, df: pd.DataFrame, split: Split):
assert pd.Index.equals(df.columns, self.column_train_ind.index)
if split is Split.train_temporal:
return df.loc[:, self.column_train_ind]
elif split is Split.test_temporal:
return df.loc[:, self.column_test_ind]
import pandas as pd
import numpy as np
from spatio_temporal_dataset.spatio_temporal_split import SpatialTemporalSplit, SpatioTemporalSlicer, \
train_ind_from_s_split, TEST_SPLIT_STR, TRAIN_SPLIT_STR, s_split_from_ratio, spatio_temporal_slice
from spatio_temporal_dataset.slicer.abstract_slicer import slice, AbstractSlicer
from spatio_temporal_dataset.slicer.split import Split, \
train_ind_from_s_split, TEST_SPLIT_STR, TRAIN_SPLIT_STR, s_split_from_ratio
class AbstractSpatioTemporalObservations(object):
......@@ -22,30 +23,39 @@ class AbstractSpatioTemporalObservations(object):
raise AttributeError('A split is already defined, there is no need to specify a ratio')
elif s_split is not None or train_split_ratio is not None:
if train_split_ratio:
s_split = s_split_from_ratio(length=self.nb_obs, train_split_ratio=train_split_ratio)
s_split = s_split_from_ratio(index=self._df_maxima.columns, train_split_ratio=train_split_ratio)
assert len(s_split) == len(self._df_maxima.columns)
assert s_split.isin([TRAIN_SPLIT_STR, TEST_SPLIT_STR]).all()
self.s_split = s_split
@property
def nb_obs(self):
def _df_maxima(self) -> pd.DataFrame:
if self.df_maxima_frech is not None:
return len(self.df_maxima_frech.columns)
return self.df_maxima_frech
else:
return len(self.df_maxima_gev.columns)
return self.df_maxima_gev
@property
def index(self) -> pd.Index:
return self._df_maxima.index
@property
def nb_obs(self) -> int:
return len(self._df_maxima.columns)
@classmethod
def from_df(cls, df):
pass
def maxima_gev(self, split: SpatialTemporalSplit = SpatialTemporalSplit.all, slicer: SpatioTemporalSlicer = None):
return spatio_temporal_slice(self.df_maxima_gev, split, slicer).values
def maxima_gev(self, split: Split = Split.all, slicer: AbstractSlicer = None) -> np.ndarray:
return slice(self.df_maxima_gev, split, slicer).values
def maxima_frech(self, split: SpatialTemporalSplit = SpatialTemporalSplit.all, slicer: SpatioTemporalSlicer = None):
return spatio_temporal_slice(self.df_maxima_frech, split, slicer).values
def maxima_frech(self, split: Split = Split.all, slicer: AbstractSlicer = None) -> np.ndarray:
return slice(self.df_maxima_frech, split, slicer).values
def set_maxima_frech(self, maxima_frech_values: np.ndarray, split: SpatialTemporalSplit = SpatialTemporalSplit.all,
slicer: SpatioTemporalSlicer = None):
df = spatio_temporal_slice(self.df_maxima_frech, split, slicer)
def set_maxima_frech(self, maxima_frech_values: np.ndarray, split: Split = Split.all,
slicer: AbstractSlicer = None):
df = slice(self.df_maxima_frech, split, slicer)
df.loc[:] = maxima_frech_values
@property
......
from enum import Enum
import pandas as pd
class SpatialTemporalSplit(Enum):
all = 0
train = 1
test = 2
test_temporal = 3
test_spatial = 4
class SpatioTemporalSlicer(object):
def __init__(self, coordinates_train_ind: pd.Series, observations_train_ind: pd.Series):
self.index_train_ind = coordinates_train_ind # type: pd.Series
self.column_train_ind = observations_train_ind # type: pd.Series
if self.ind_are_not_defined:
msg = "One split was not defined \n \n" \
"index: \n {} \n, column:\n {} \n".format(self.index_train_ind, self.column_train_ind)
assert self.index_train_ind is None and self.column_train_ind is None, msg
def summary(self):
print('SpatioTemporal split summary: \n')
for s, global_name in [(self.index_train_ind, "Spatial"), (self.column_train_ind, "Temporal")]:
print(global_name + ' split')
for f, name in [(len, 'Total'), (sum, 'train')]:
print("{}: {}".format(name, f(s)))
print('\n')
@property
def index_test_ind(self) -> pd.Series:
return ~self.index_train_ind
@property
def column_test_ind(self) -> pd.Series:
return ~self.column_train_ind
@property
def ind_are_not_defined(self):
return self.index_train_ind is None or self.column_train_ind is None
def loc_split(self, df: pd.DataFrame, split: SpatialTemporalSplit):
assert isinstance(split, SpatialTemporalSplit)
# By default, if one of the two split is not defined we return all the data
if self.ind_are_not_defined or split is SpatialTemporalSplit.all:
return df
assert pd.RangeIndex.equals(df.columns, self.column_train_ind.index)
assert pd.RangeIndex.equals(df.index, self.index_train_ind.index)
if split is SpatialTemporalSplit.train:
return df.loc[self.index_train_ind, self.column_train_ind]
elif split is SpatialTemporalSplit.test:
return df.loc[self.index_test_ind, self.column_test_ind]
elif split is SpatialTemporalSplit.test_spatial:
return df.loc[self.index_test_ind, self.column_train_ind]
elif split is SpatialTemporalSplit.test_temporal:
return df.loc[self.index_train_ind, self.column_test_ind]
SPLIT_NAME = 'split'
TRAIN_SPLIT_STR = 'train_split'
TEST_SPLIT_STR = 'test_split'
def train_ind_from_s_split(s_split):
if s_split is None:
return None
else:
return s_split.isin([TRAIN_SPLIT_STR])
def s_split_from_ratio(length, train_split_ratio):
assert 0 < train_split_ratio < 1
s = pd.Series([TEST_SPLIT_STR for _ in range(length)])
nb_points_train = int(length * train_split_ratio)
train_ind = pd.Series.sample(s, n=nb_points_train).index
s.loc[train_ind] = TRAIN_SPLIT_STR
return s
def spatio_temporal_slice(df: pd.DataFrame, split: SpatialTemporalSplit = SpatialTemporalSplit.all,
slicer: SpatioTemporalSlicer = None) -> pd.DataFrame:
if slicer is None:
assert split is SpatialTemporalSplit.all
return df
else:
return slicer.loc_split(df, split)
......@@ -8,8 +8,8 @@ from test.test_utils import load_test_max_stable_models, load_smooth_margin_mode
class TestFullEstimators(unittest.TestCase):
DISPLAY = False
nb_obs = 10
nb_points = 5
nb_obs = 3
nb_points = 2
def setUp(self):
super().setUp()
......
......@@ -11,8 +11,8 @@ from test.test_utils import load_test_max_stable_models, load_test_1D_and_2D_coo
class TestMaxStableEstimators(unittest.TestCase):
DISPLAY = False
nb_points = 5
nb_obs = 10
nb_points = 2
nb_obs = 3
def setUp(self):
super().setUp()
......
import pandas as pd
import numpy as np
from rpy2.rinterface import RRuntimeError
import unittest
from itertools import product
from extreme_estimator.extreme_models.margin_model.smooth_margin_model import ConstantMarginModel
from extreme_estimator.extreme_models.max_stable_model.max_stable_models import Smith
from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates
from spatio_temporal_dataset.coordinates.unidimensional_coordinates.coordinates_1D import LinSpaceCoordinates
from spatio_temporal_dataset.dataset.simulation_dataset import MaxStableDataset, FullSimulatedDataset
from spatio_temporal_dataset.slicer.spatial_slicer import SpatialSlicer
from spatio_temporal_dataset.slicer.spatio_temporal_slicer import SpatioTemporalSlicer
from spatio_temporal_dataset.slicer.split import ALL_SPLITS_EXCEPT_ALL, Split
from spatio_temporal_dataset.slicer.temporal_slicer import TemporalSlicer
from spatio_temporal_dataset.spatio_temporal_observations.abstract_spatio_temporal_observations import \
AbstractSpatioTemporalObservations
class TestSlicerForDataset(unittest.TestCase):
def __init__(self, methodName: str = ...) -> None:
super().__init__(methodName)
self.dataset = None
nb_spatial_points = 2
nb_temporal_obs = 2
complete_shape = (nb_spatial_points, nb_temporal_obs)
def load_dataset(self, slicer_class, split_ratio_spatial, split_ratio_temporal):
coordinates = LinSpaceCoordinates.from_nb_points(nb_points=self.nb_spatial_points,
train_split_ratio=split_ratio_spatial)
return FullSimulatedDataset.from_double_sampling(nb_obs=self.nb_temporal_obs,
train_split_ratio=split_ratio_temporal,
margin_model=ConstantMarginModel(coordinates=coordinates),
coordinates=coordinates, max_stable_model=Smith(),
slicer_class=slicer_class)
def get_shape(self, dataset, split):
return dataset.maxima_frech(split).shape
def test_spatiotemporal_slicer_for_dataset(self):
ind_tuple_to_observation_shape = {
(None, None): self.complete_shape,
(None, 0.5): self.complete_shape,
(0.5, None): self.complete_shape,
(0.5, 0.5): (1, 1),
}
self.check_shapes(ind_tuple_to_observation_shape, SpatioTemporalSlicer)
def test_spatial_slicer_for_dataset(self):
ind_tuple_to_observation_shape = {
(None, None): self.complete_shape,
(None, 0.5): self.complete_shape,
(0.5, None): (1, 2),
(0.5, 0.5): (1, 2),
}
self.check_shapes(ind_tuple_to_observation_shape, SpatialSlicer)
def test_temporal_slicer_for_dataset(self):
ind_tuple_to_observation_shape = {
(None, None): self.complete_shape,
(None, 0.5): (2, 1),
(0.5, None): self.complete_shape,
(0.5, 0.5): (2, 1),
}
self.check_shapes(ind_tuple_to_observation_shape, TemporalSlicer)
def check_shapes(self, ind_tuple_to_observation_shape, slicer_type):
for split_ratio, data_shape in ind_tuple_to_observation_shape.items():
dataset = self.load_dataset(slicer_type, *split_ratio)
self.assertEqual(self.complete_shape, self.get_shape(dataset, Split.all))
for split in ALL_SPLITS_EXCEPT_ALL:
if split in dataset.slicer.splits:
self.assertEqual(data_shape, self.get_shape(dataset, split))
else:
with self.assertRaises(AssertionError):
self.get_shape(dataset, split)
class TestSlicerForCoordinates(unittest.TestCase):
def nb_coordinates(self, coordinates: AbstractCoordinates, split):
return len(coordinates.coordinates_values(split))
def test_slicer_for_coordinates(self):
for split in Split:
coordinates1 = LinSpaceCoordinates.from_nb_points(nb_points=2, train_split_ratio=0.5)
if split in SpatialSlicer.SPLITS:
self.assertEqual(self.nb_coordinates(coordinates1, split), 1)
elif split in SpatioTemporalSlicer.SPLITS:
self.assertEqual(self.nb_coordinates(coordinates1, split), 1)
elif split in TemporalSlicer.SPLITS:
self.assertEqual(self.nb_coordinates(coordinates1, split), 2)
else:
self.assertEqual(self.nb_coordinates(coordinates1, split), 2)
coordinates2 = LinSpaceCoordinates.from_nb_points(nb_points=2)
self.assertEqual(self.nb_coordinates(coordinates2, split), 2)
class TestSlicerForObservations(unittest.TestCase):
def load_observations(self, split_ratio_temporal):
df = pd.DataFrame.from_dict(
{
'year1': [1 for _ in range(4)],
'year2': [2 for _ in range(4)],
})
return AbstractSpatioTemporalObservations(df_maxima_frech=df, train_split_ratio=split_ratio_temporal)
def nb_obs(self, observations, split, slicer):
return len(np.transpose(observations.maxima_frech(split, slicer)))
def test_slicer_for_observations(self):
observations = self.load_observations(0.5)
# For the None Slicer, a slice should be returned only for split=SpatialTemporalSplit.all
# self.assertEqual(len(observations.maxima_frech(SpatialTemporalSplit.all, None)), 2)
self.assertEqual(2, self.nb_obs(observations, Split.all, None))
for split in ALL_SPLITS_EXCEPT_ALL:
with self.assertRaises(AssertionError):
observations.maxima_frech(split, None)
# For other slicers we try out all the possible combinations
slicer_type_to_size = {
SpatialSlicer: 2,
TemporalSlicer: 1,
SpatioTemporalSlicer: 1,
}
for slicer_type, size in slicer_type_to_size.items():
for coordinates_train_ind in [None, pd.Series([True, True, True, False])][::-1]:
slicer = slicer_type(coordinates_train_ind=coordinates_train_ind,
observations_train_ind=observations.train_ind)
self.assertEqual(2, self.nb_obs(observations, Split.all, slicer))
for split in ALL_SPLITS_EXCEPT_ALL:
if split in slicer.splits:
# By default for SpatioTemporalSlicer should slice if both train_ind are available
# Otherwise if coordinates_train_ind is None, then it should return all the data
if slicer_type is SpatioTemporalSlicer and coordinates_train_ind is None:
size = 2
self.assertEqual(size, self.nb_obs(observations, split, slicer))
else:
with self.assertRaises(AssertionError):
observations.maxima_frech(split, slicer)
if __name__ == '__main__':
unittest.main()
......@@ -6,7 +6,7 @@ import pandas as pd
from spatio_temporal_dataset.spatio_temporal_observations.abstract_spatio_temporal_observations import AbstractSpatioTemporalObservations
class TestTemporalObservations(unittest.TestCase):
class TestSpatioTemporalObservations(unittest.TestCase):
DISPLAY = False
def test_set_maxima_gev(self):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment