Commit 9818f99a authored by Le Roux Erwan's avatar Le Roux Erwan
Browse files

[DATA] fit gev on each SAFRAN massif, add test.

parent 4d582087
No related merge requests found
Showing with 294 additions and 172 deletions
+294 -172
...@@ -32,8 +32,11 @@ def spatial_extreme_gevmle_fit(x_gev): ...@@ -32,8 +32,11 @@ def spatial_extreme_gevmle_fit(x_gev):
class GevMleFit(object): class GevMleFit(object):
def __init__(self, x_gev: np.ndarray): def __init__(self, x_gev: np.ndarray):
assert np.ndim(x_gev) == 1
assert len(x_gev) > 1
self.x_gev = x_gev self.x_gev = x_gev
self.mle_params = spatial_extreme_gevmle_fit(x_gev) self.mle_params = spatial_extreme_gevmle_fit(x_gev)
self.gev_params = GevParams.from_dict(self.mle_params)
self.shape = self.mle_params[GevParams.GEV_SHAPE] self.shape = self.mle_params[GevParams.GEV_SHAPE]
self.scale = self.mle_params[GevParams.GEV_SCALE] self.scale = self.mle_params[GevParams.GEV_SCALE]
self.loc = self.mle_params[GevParams.GEV_LOC] self.loc = self.mle_params[GevParams.GEV_LOC]
import numpy as np import numpy as np
import pandas as pd
from extreme_estimator.extreme_models.utils import r from extreme_estimator.extreme_models.utils import r
...@@ -38,8 +39,10 @@ class GevParams(object): ...@@ -38,8 +39,10 @@ class GevParams(object):
} }
def to_array(self) -> np.ndarray: def to_array(self) -> np.ndarray:
gev_param_name_to_value = self.to_dict() return self.to_serie().values
return np.array([gev_param_name_to_value[gev_param_name] for gev_param_name in self.GEV_PARAM_NAMES])
def to_serie(self) -> pd.Series:
return pd.Series(self.to_dict(), index=self.GEV_PARAM_NAMES)
# GEV quantiles # GEV quantiles
......
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
import os
import os.path as op
import pandas as pd
from netCDF4 import Dataset
from extreme_estimator.gev.fit_gev import gev_mle_fit
from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates
from spatio_temporal_dataset.coordinates.spatial_coordinates.abstract_spatial_coordinates import \
AbstractSpatialCoordinates
from utils import get_full_path
from collections import OrderedDict
from datetime import datetime, timedelta
class Safran(object):
def __init__(self):
self.year_to_dataset = OrderedDict()
print(self.safran_full_path)
nc_files = [(self.str_to_year(file), file) for file in os.listdir(self.safran_full_path) if file.endswith('.nc')]
for year, nc_file in sorted(nc_files, key=lambda t: t[0]):
self.year_to_dataset[year] = Dataset(op.join(self.safran_full_path, nc_file))
#
# Map each index to the corresponding massif
print(list(self.year_to_dataset.keys()))
massifs_str = self.year_to_dataset[1958].massifsList.split('/')
self.int_to_massif = {j: Massif.from_str(massif_str) for j, massif_str in enumerate(massifs_str)}
safran_massif_names = set([massif.name for massif in self.int_to_massif.values()])
# # Map each datetime to a snowfall amount in kg/m2
self.datetime_to_snowfall_amount = OrderedDict()
self.year_to_snowfall_amount = {}
for year, dataset in self.year_to_dataset.items():
# Starting date
start_datetime = self.new_year_day(year)
start_times = np.array(dataset.variables['time'])[:-1]
snowfall_rates = np.array(dataset.variables['Snowf'])
mean_snowfall_rates = 0.5 * (snowfall_rates[:-1] + snowfall_rates[1:])
self.year_to_snowfall_amount[year] = []
for start_time, mean_snowrate in zip(start_times, mean_snowfall_rates):
# time is in seconds, snowfall is in kg/m2/s
delta_time = timedelta(seconds=start_time)
current_datetime = start_datetime + delta_time
snowfall_in_one_hour = 60 * 60 * mean_snowrate
# TODO: for the datetime, I should put the middle of the interval, isntead of the start
self.datetime_to_snowfall_amount[current_datetime] = snowfall_in_one_hour
self.year_to_snowfall_amount[year].append(snowfall_in_one_hour)
# Extract the maxima, and fit a GEV on each massif
massif_to_maxima = {massif_name: [] for massif_name in safran_massif_names}
for year, snowfall_amount in self.year_to_snowfall_amount.items():
snowfall_amount = np.array(snowfall_amount)
print(snowfall_amount.shape)
max_snowfall_amount = snowfall_amount.max(axis=0)
# todo: take the daily maxima
print(max_snowfall_amount.shape)
for i, massif in self.int_to_massif.items():
massif_to_maxima[massif.name].append(max_snowfall_amount[i])
print(massif_to_maxima)
# Fit a gev n each massif
# todo: find another for the gev fit
massif_to_gev_mle = {massif: gev_mle_fit(np.array(massif_to_maxima[massif])) for massif in safran_massif_names}
# Visualize the massif
# todo: adapt the color depending on the value (create 3 plots, one for each gev aprameters) REFACTOR THE CODE
self.visualize(safran_massif_names)
def visualize(self, safran_massif_names):
# Map each index to the correspond centroid
df_centroid = pd.read_csv(op.join(self.map_full_path, 'coordonnees_massifs_alpes.csv'))
for coord_column in [AbstractCoordinates.COORDINATE_X, AbstractCoordinates.COORDINATE_Y]:
df_centroid.loc[:, coord_column] = df_centroid[coord_column].str.replace(',', '.').astype(float)
coordinate_massif_names = set(df_centroid['NOM'])
# Assert that the massif names are the same between SAFRAN and the coordinates
assert not set(safran_massif_names).symmetric_difference(coordinate_massif_names)
# todo: the coordinate are probably in Lambert extended
# Build coordinate object
print(df_centroid.dtypes)
coordinate = AbstractSpatialCoordinates.from_df(df_centroid)
# Load the
df_massif = pd.read_csv(op.join(self.map_full_path, 'massifsalpes.csv'))
coord_tuples = [(row_massif['idx'], row_massif[AbstractCoordinates.COORDINATE_X],
row_massif[AbstractCoordinates.COORDINATE_Y])
for _, row_massif in df_massif.iterrows()]
for massif_idx in set([tuple[0] for tuple in coord_tuples]):
l = [coords for idx, *coords in coord_tuples if idx == massif_idx]
l = list(zip(*l))
plt.plot(*l, color='black')
plt.fill(*l)
coordinate.visualization_2D()
def new_year_day(self, year):
return datetime(year=year, month=8, day=1, hour=6)
def year_to_daily_maxima(self, nb_days=1):
deltatime = timedelta(days=nb_days)
start_time, *_, end_time = self.datetime_to_snowfall_amount.keys()
total_snowfall = np.zeros(shape=(len(self.datetime_to_snowfall_amount), len(self.int_to_massif)))
print(total_snowfall.shape)
# for initial_time in self.datetime_to_snowfall_amount.keys():
# final_time = initial_time + deltatime
# new_year = self.new_year_day(initial_time.year)
# # two dates should correspond to the same year
# if not ((final_time > new_year) and (initial_time < new_year)):
#
# print(start_time)
# print(end_time)
@staticmethod
def str_to_year(s):
return int(s.split('_')[1][:4])
@property
def full_path(self) -> str:
return get_full_path(relative_path=self.relative_path)
@property
def relative_path(self) -> str:
return r'local/spatio_temporal_datasets'
@property
def safran_altitude(self) -> int:
raise NotImplementedError
@property
def safran_full_path(self) -> str:
return op.join(self.full_path, 'safran-crocus_{}'.format(self.safran_altitude), 'Safran')
@property
def map_full_path(self):
return op.join(self.full_path, 'map')
class Safran1800(Safran):
@property
def safran_altitude(self) -> int:
return 1800
class Safran2400(Safran):
@property
def safran_altitude(self) -> int:
return 2400
class Massif(object):
def __init__(self, name: str, id: int, lat: float, lon: float) -> None:
self.lon = lon
self.lat = lat
self.id = id
self.name = name
@classmethod
def from_str(cls, s: str):
name, id, lat, lon = s.split(',')
return cls(name.strip(), int(id), float(lat), float(lon))
if __name__ == '__main__':
safran_object = Safran1800()
# print(safran_object.year_to_daily_maxima(nb_days=3))
import os
import os.path as op
from collections import OrderedDict
import matplotlib.pyplot as plt
import pandas as pd
from netCDF4 import Dataset
from extreme_estimator.gev.gevmle_fit import GevMleFit
from safran_study.massif import safran_massif_names_from_datasets
from safran_study.snowfall_annual_maxima import SafranSnowfall
from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates
from spatio_temporal_dataset.coordinates.spatial_coordinates.abstract_spatial_coordinates import \
AbstractSpatialCoordinates
from utils import get_full_path, cached_property
class Safran(object):
def __init__(self, nb_days_of_snowfall=1):
self.safran_altitude = None
self.nb_days_of_snowfall = nb_days_of_snowfall
def write_to_file(self, df):
if not op.exists(self.result_full_path):
os.makedirs(self.result_full_path, exist_ok=True)
df.to_csv(op.join(self.result_full_path, 'merged_array_{}_altitude.csv'.format(self.safran_altitude)))
""" Visualization methods """
def visualize(self):
df_massif = pd.read_csv(op.join(self.map_full_path, 'massifsalpes.csv'))
coord_tuples = [(row_massif['idx'], row_massif[AbstractCoordinates.COORDINATE_X],
row_massif[AbstractCoordinates.COORDINATE_Y])
for _, row_massif in df_massif.iterrows()]
for massif_idx in set([tuple[0] for tuple in coord_tuples]):
l = [coords for idx, *coords in coord_tuples if idx == massif_idx]
l = list(zip(*l))
plt.plot(*l, color='black')
plt.fill(*l)
self.massifs_coordinates.visualization_2D()
""" Statistics methods """
@property
def df_gev_mle_each_massif(self):
# Fit a gev n each massif
massif_to_gev_mle = {massif_name: GevMleFit(self.df_annual_maxima[massif_name]).gev_params.to_serie()
for massif_name in self.safran_massif_names}
return pd.DataFrame(massif_to_gev_mle)
""" Annual maxima of snowfall """
@property
def df_annual_maxima(self):
return pd.DataFrame(self.year_to_annual_maxima, index=self.safran_massif_names).T
""" Load some attributes only once """
@cached_property
def year_to_annual_maxima(self):
year_to_safran_snowfall = {year: SafranSnowfall(dataset) for year, dataset in
self.year_to_dataset_ordered_dict.items()}
year_to_annual_maxima = OrderedDict()
for year in self.year_to_dataset_ordered_dict.keys():
year_to_annual_maxima[year] = year_to_safran_snowfall[year].annual_maxima_of_snowfall(
self.nb_days_of_snowfall)
return year_to_annual_maxima
@cached_property
def safran_massif_names(self):
# Load the names of the massif as defined by SAFRAN
return safran_massif_names_from_datasets(self.year_to_dataset_ordered_dict.values())
@cached_property
def year_to_dataset_ordered_dict(self) -> OrderedDict:
# Map each year to the correspond netCDF4 Dataset
year_to_dataset = OrderedDict()
nc_files = [(int(f.split('_')[1][:4]), f) for f in os.listdir(self.safran_full_path) if f.endswith('.nc')]
for year, nc_file in sorted(nc_files, key=lambda t: t[0]):
year_to_dataset[year] = Dataset(op.join(self.safran_full_path, nc_file))
return year_to_dataset
@cached_property
def massifs_coordinates(self) -> AbstractSpatialCoordinates:
# Coordinate object that represents the massif coordinates in Lambert extended
df_centroid = pd.read_csv(op.join(self.map_full_path, 'coordonnees_massifs_alpes.csv'))
for coord_column in [AbstractCoordinates.COORDINATE_X, AbstractCoordinates.COORDINATE_Y]:
df_centroid.loc[:, coord_column] = df_centroid[coord_column].str.replace(',', '.').astype(float)
# Assert that the massif names are the same between SAFRAN and the coordinate file
assert not set(self.safran_massif_names).symmetric_difference(set(df_centroid['NOM']))
# Build coordinate object from df_centroid
return AbstractSpatialCoordinates.from_df(df_centroid)
""" Some properties """
@property
def relative_path(self) -> str:
return r'local/spatio_temporal_datasets'
@property
def full_path(self) -> str:
return get_full_path(relative_path=self.relative_path)
@property
def safran_full_path(self) -> str:
return op.join(self.full_path, 'safran-crocus_{}'.format(self.safran_altitude), 'Safran')
@property
def map_full_path(self) -> str:
return op.join(self.full_path, 'map')
@property
def result_full_path(self) -> str:
return op.join(self.full_path, 'results')
\ No newline at end of file
from utils import first
class Massif(object):
def __init__(self, name: str, id: int, lat: float, lon: float) -> None:
self.lon = lon
self.lat = lat
self.id = id
self.name = name
@classmethod
def from_str(cls, s: str):
name, id, lat, lon = s.split(',')
return cls(name.strip(), int(id), float(lat), float(lon))
def safran_massif_names_from_datasets(datasets):
# Assert the all the datasets have the same indexing for the massif
assert len(set([dataset.massifsList for dataset in datasets])) == 1
# List of the name of the massif used by all the SAFRAN datasets
return [Massif.from_str(massif_str).name for massif_str in first(datasets).massifsList.split('/')]
\ No newline at end of file
import pandas as pd
from safran_study.abstract_safran import Safran
from utils import VERSION
class Safran1800(Safran):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.safran_altitude = 1800
class Safran2400(Safran):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.safran_altitude = 2400
def fit_mle_gev_for_all_safran_and_different_days():
# Dump the result in a csv
dfs = []
for safran_class in [Safran1800, Safran2400]:
for nb_day in [1, 3, 7]:
print('here')
safran = safran_class(nb_day)
df = safran.df_gev_mle_each_massif
df.index += ' Safran{} with {} days'.format(safran.safran_altitude, safran.nb_days_of_snowfall)
dfs.append(df)
df = pd.concat(dfs)
path = r'/home/erwan/Documents/projects/spatiotemporalextremes/local/spatio_temporal_datasets/results/fit_mle_massif/fit_mle_gev_{}.csv'
df.to_csv(path.format(VERSION))
if __name__ == '__main__':
fit_mle_gev_for_all_safran_and_different_days()
\ No newline at end of file
import numpy as np
class SafranSnowfall(object):
""""
Hypothesis:
-How to count how much snowfall in one hour ?
I take the average between the rhythm of snowfall per second between the start and the end
and multiply that by 60 x 60 which corresponds to the number of seconds in one hour
-How do how I define the limit of a day ?
From the start, i.e. in August at 4am something like that,then if I add a 24H duration, I arrive to the next day
-How do you aggregate several days ?
We aggregate all the N consecutive days into a value x_i, then we take the max
(but here the problem might be that the x_i are not idnependent, they are highly dependent one from another)
"""
def __init__(self, dataset):
self.dataset = dataset
# Corresponding starting times
# start_datetime = datetime(year=year, month=8, day=1, hour=6)
# start_times = np.array(dataset.variables['time'])[:-1]
# Compute the daily snowfall in kg/m2
snowfall_rates = np.array(dataset.variables['Snowf'])
mean_snowfall_rates = 0.5 * (snowfall_rates[:-1] + snowfall_rates[1:])
hourly_snowfall = 60 * 60 * mean_snowfall_rates
# Transform the snowfall amount into a dataframe
nb_days = len(hourly_snowfall) // 24
self.daily_snowfall = [sum(hourly_snowfall[24 * i:24 * (i+1)]) for i in range(nb_days)]
def annual_maxima_of_snowfall(self, nb_days_of_snowfall=1):
# Aggregate the daily snowfall by the number of consecutive days
shifted_list = [self.daily_snowfall[i:] for i in range(nb_days_of_snowfall)]
snowfall_in_consecutive_days = [sum(e) for e in zip(*shifted_list)]
# Return the maximum of the aggregated list
return np.array(snowfall_in_consecutive_days).max(axis=0)
import unittest
from test.test_utils import load_safran_objects
class TestFullEstimators(unittest.TestCase):
def test_gev_mle_per_massif(self):
safran_1800_one_day = load_safran_objects()[0]
df = safran_1800_one_day.df_gev_mle_each_massif
self.assertAlmostEqual(df.values.sum(), 1131.4551665871832)
if __name__ == '__main__':
unittest.main()
...@@ -7,6 +7,7 @@ from extreme_estimator.extreme_models.max_stable_model.abstract_max_stable_model ...@@ -7,6 +7,7 @@ from extreme_estimator.extreme_models.max_stable_model.abstract_max_stable_model
AbstractMaxStableModelWithCovarianceFunction, CovarianceFunction AbstractMaxStableModelWithCovarianceFunction, CovarianceFunction
from extreme_estimator.extreme_models.max_stable_model.max_stable_models import Smith, BrownResnick, Schlather, \ from extreme_estimator.extreme_models.max_stable_model.max_stable_models import Smith, BrownResnick, Schlather, \
Geometric, ExtremalT, ISchlather Geometric, ExtremalT, ISchlather
from safran_study.safran import Safran2400, Safran1800
from spatio_temporal_dataset.coordinates.spatial_coordinates.alps_station_3D_coordinates import \ from spatio_temporal_dataset.coordinates.spatial_coordinates.alps_station_3D_coordinates import \
AlpsStation3DCoordinatesWithAnisotropy AlpsStation3DCoordinatesWithAnisotropy
from spatio_temporal_dataset.coordinates.spatial_coordinates.generated_spatial_coordinates import \ from spatio_temporal_dataset.coordinates.spatial_coordinates.generated_spatial_coordinates import \
...@@ -80,3 +81,9 @@ def load_test_spatiotemporal_coordinates(nb_points, nb_steps, train_split_ratio= ...@@ -80,3 +81,9 @@ def load_test_spatiotemporal_coordinates(nb_points, nb_steps, train_split_ratio=
return [coordinate_class.from_nb_points_and_nb_steps(nb_points=nb_points, nb_steps=nb_steps, return [coordinate_class.from_nb_points_and_nb_steps(nb_points=nb_points, nb_steps=nb_steps,
train_split_ratio=train_split_ratio) train_split_ratio=train_split_ratio)
for coordinate_class in TEST_SPATIO_TEMPORAL_COORDINATES] for coordinate_class in TEST_SPATIO_TEMPORAL_COORDINATES]
def load_safran_objects():
nb_days_list = [1, 3, 5][:1]
safran_classes = [Safran1800, Safran2400][:1]
return [safran_class(nb_days) for safran_class in safran_classes for nb_days in nb_days_list]
\ No newline at end of file
import datetime
import os.path as op import os.path as op
VERSION = datetime.datetime.now()
def get_root_path() -> str: def get_root_path() -> str:
return op.dirname(op.abspath(__file__)) return op.dirname(op.abspath(__file__))
...@@ -12,3 +14,48 @@ def get_full_path(relative_path: str) -> str: ...@@ -12,3 +14,48 @@ def get_full_path(relative_path: str) -> str:
def get_display_name_from_object_type(object_type): def get_display_name_from_object_type(object_type):
# assert isinstance(object_type, type), object_type # assert isinstance(object_type, type), object_type
return str(object_type).split('.')[-1].split("'")[0] return str(object_type).split('.')[-1].split("'")[0]
def first(s):
"""Return the first element from an ordered collection
or an arbitrary element from an unordered collection.
Raise StopIteration if the collection is empty.
"""
return next(iter(s))
class cached_property(object):
"""
Descriptor (non-data) for building an attribute on-demand on first use.
From: https://stackoverflow.com/questions/4037481/caching-attributes-of-classes-in-python
"""
def __init__(self, factory):
"""
<factory> is called such: factory(instance) to build the attribute.
"""
self._attr_name = factory.__name__
self._factory = factory
def __get__(self, instance, owner):
# Build the attribute.
attr = self._factory(instance)
# Cache the value; hide ourselves.
setattr(instance, self._attr_name, attr)
return attr
class Example(object):
@cached_property
def big_attribute(self):
print('Long loading only once...')
return 2
if __name__ == '__main__':
e = Example()
print(e.big_attribute)
print(e.big_attribute)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment