Commit 0bfb3355 authored by Le Roux Erwan's avatar Le Roux Erwan
Browse files

[SCM][NON STATIONARY TRENDS] add cpu multiprocessing to speed up computation

parent 5349f19a
No related merge requests found
Showing with 41 additions and 24 deletions
+41 -24
......@@ -96,7 +96,7 @@ def normal_visualization(temporal_non_stationarity=False):
study_visualizer = StudyVisualizer(study, save_to_file=save_to_file,
temporal_non_stationarity=temporal_non_stationarity)
# study_visualizer.visualize_independent_margin_fits(threshold=[None, 20, 40, 60][0])
# study_visualizer.visualize_annual_mean_values()
study_visualizer.visualize_annual_mean_values()
study_visualizer.visualize_linear_margin_fit(only_first_max_stable=None)
......@@ -116,17 +116,17 @@ def complete_analysis(only_first_one=False):
def trend_analysis():
save_to_file = True
only_first_one = False
save_to_file = False
only_first_one = True
short_altitudes = [300, 1200, 2100, 3000][:1]
full_altitude_with_at_least_2_stations = [0, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000, 3300, 3600, 3900, 4200][:]
full_altitude_with_at_least_2_stations = [0, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000, 3300, 3600, 3900, 4200][-1:]
altitudes = full_altitude_with_at_least_2_stations
normalization_class = [None, BetweenMinusOneAndOneNormalization, BetweenZeroAndOneNormalization][-1]
study_classes = [CrocusSwe, CrocusDepth, SafranSnowfall, SafranRainfall, SafranTemperature][:]
for study in study_iterator_global(study_classes, only_first_one=only_first_one, altitudes=altitudes):
study_visualizer = StudyVisualizer(study, save_to_file=save_to_file,
transformation_class=normalization_class)
study_visualizer.visualize_temporal_trend_relevance(complete_analysis=True)
study_visualizer.visualize_temporal_trend_relevance(complete_analysis=True, multiprocessing=True)
if __name__ == '__main__':
......
import time
from multiprocessing import Pool
from typing import Union
from experiment.meteo_france_SCM_study.visualization.utils import align_yaxis_on_zero
......@@ -20,8 +21,10 @@ from utils import get_display_name_from_object_type
class AbstractNonStationaryTrendTest(object):
RESULT_ATTRIBUTE_METRIC = 'deviance'
def __init__(self, dataset: AbstractDataset, verbose, estimator_class,
stationary_margin_model_class, non_stationary_margin_model_class):
def __init__(self, dataset: AbstractDataset, estimator_class,
stationary_margin_model_class, non_stationary_margin_model_class,
verbose=False,
multiprocessing=False):
self.verbose = verbose
self.dataset = dataset
self.estimator_class = estimator_class
......@@ -30,28 +33,35 @@ class AbstractNonStationaryTrendTest(object):
# Compute a dictionary that maps couple (margin model class, starting point)
# to the corresponding fitted estimator
self._margin_model_class_and_starting_point_to_estimator = {}
# parallelization arguments
self.multiprocessing = multiprocessing
self.nb_cores = 7
def get_estimator(self, margin_model_class, starting_point) -> Union[
AbstractFullEstimator, AbstractMarginEstimator]:
if (margin_model_class, starting_point) not in self._margin_model_class_and_starting_point_to_estimator:
margin_model = margin_model_class(coordinates=self.dataset.coordinates, starting_point=starting_point)
estimator = self._load_estimator(margin_model)
if self.verbose:
estimator_name = get_display_name_from_object_type(estimator)
margin_model_name = get_display_name_from_object_type(margin_model)
print('Fitting {} with margin: {} for starting_point={}'.format(estimator_name, margin_model_name, starting_point))
start = time.time()
estimator.fit()
duration = time.time() - start
if self.verbose:
print('Fit took {}s and was {}'.format(round(duration, 1), estimator.result_from_fit.convergence))
if self.verbose:
estimator_name = get_display_name_from_object_type(estimator)
margin_model_name = get_display_name_from_object_type(margin_model)
text = 'Fittig {} with margin: {} for starting_point={}'.format(estimator_name,
margin_model_name,
starting_point)
text += 'Fit took {}s and was {}'.format(round(duration, 1), estimator.result_from_fit.convergence)
print(text)
self._margin_model_class_and_starting_point_to_estimator[(margin_model_class, starting_point)] = estimator
return self._margin_model_class_and_starting_point_to_estimator[(margin_model_class, starting_point)]
def _load_estimator(self, margin_model) -> Union[AbstractFullEstimator, AbstractMarginEstimator]:
return self.estimator_class(self.dataset, margin_model)
def get_metric(self, margin_model_class, starting_point):
def get_metric(self, starting_point):
margin_model_class = self.stationary_margin_model_class if starting_point is None else self.non_stationary_margin_model_class
estimator = self.get_estimator(margin_model_class, starting_point)
metric = estimator.result_from_fit.__getattribute__(self.RESULT_ATTRIBUTE_METRIC)
assert isinstance(metric, float)
......@@ -67,9 +77,15 @@ class AbstractNonStationaryTrendTest(object):
def visualize(self, ax, complete_analysis=True):
years = self.years(complete_analysis)
# Compute metric with parallelization or not
if self.multiprocessing:
with Pool(self.nb_cores) as p:
stationary_metric, *non_stationary_metrics = p.map(self.get_metric, [None] + years)
else:
stationary_metric = self.get_metric(starting_point=None)
non_stationary_metrics = [self.get_metric(year) for year in years]
# Plot differences
stationary_metric = self.get_metric(self.stationary_margin_model_class, starting_point=None)
non_stationary_metrics = [self.get_metric(self.non_stationary_margin_model_class, year) for year in years]
difference = [m - stationary_metric for m in non_stationary_metrics]
color_difference = 'b'
ax.plot(years, difference, color_difference + 'o-')
......@@ -123,9 +139,8 @@ class IndependenceLocationTrendTest(AbstractNonStationaryTrendTest):
class ConditionalIndedendenceLocationTrendTest(AbstractNonStationaryTrendTest):
def __init__(self, dataset, verbose=False):
super().__init__(dataset=dataset,
verbose=verbose,
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs,
estimator_class=LinearMarginEstimator,
stationary_margin_model_class=LinearStationaryMarginModel,
non_stationary_margin_model_class=LinearNonStationaryLocationMarginModel)
......@@ -137,9 +152,8 @@ class ConditionalIndedendenceLocationTrendTest(AbstractNonStationaryTrendTest):
class MaxStableLocationTrendTest(AbstractNonStationaryTrendTest):
def __init__(self, dataset, max_stable_model, verbose=False):
super().__init__(dataset=dataset,
verbose=verbose,
def __init__(self, max_stable_model, *args, **kwargs):
super().__init__(*args, **kwargs,
estimator_class=FullEstimatorInASingleStepWithSmoothMargin,
stationary_margin_model_class=LinearStationaryMarginModel,
non_stationary_margin_model_class=LinearNonStationaryLocationMarginModel)
......
......@@ -164,13 +164,16 @@ class StudyVisualizer(object):
'for the year {}'.format(self.year_for_kde_plot)
self.show_or_save_to_file()
def visualize_temporal_trend_relevance(self, complete_analysis, verbose=True):
def visualize_temporal_trend_relevance(self, complete_analysis, verbose=True, multiprocessing=False):
self.temporal_non_stationarity = True
trend_tests = [ConditionalIndedendenceLocationTrendTest(self.dataset, verbose=verbose)]
trend_tests = [ConditionalIndedendenceLocationTrendTest(dataset=self.dataset, verbose=verbose,
multiprocessing=multiprocessing)]
max_stable_models = load_test_max_stable_models(default_covariance_function=self.default_covariance_function)
for max_stable_model in [max_stable_models[1], max_stable_models[-2]]:
trend_tests.append(MaxStableLocationTrendTest(self.dataset, max_stable_model, verbose=verbose))
trend_tests.append(MaxStableLocationTrendTest(dataset=self.dataset,
max_stable_model=max_stable_model, verbose=verbose,
multiprocessing=multiprocessing))
nb_trend_tests = len(trend_tests)
fig, axes = plt.subplots(1, nb_trend_tests, figsize=self.figsize)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment