diff --git a/experiment/meteo_france_data/scm_models_data/visualization/main_study_visualizer.py b/experiment/meteo_france_data/scm_models_data/visualization/main_study_visualizer.py index 2f9e1a1ab2cd561822d7b00def266a8e39534ea8..079ceeec615312ae0e7676d3ebba489b44379a7e 100644 --- a/experiment/meteo_france_data/scm_models_data/visualization/main_study_visualizer.py +++ b/experiment/meteo_france_data/scm_models_data/visualization/main_study_visualizer.py @@ -1,29 +1,24 @@ import time +from collections import OrderedDict from typing import List -from experiment.meteo_france_data.scm_models_data.crocus.crocus_variables import CrocusDensityVariable -from experiment.meteo_france_data.scm_models_data.visualization.study_visualizer import \ - StudyVisualizer -from projects.exceeding_snow_loads.discussion_data_comparison_with_eurocode.crocus_study_comparison_with_eurocode import \ - CrocusDifferenceSnowLoad, \ - CrocusSnowDensityAtMaxofSwe, CrocusDifferenceSnowLoadRescaledAndEurocodeToSeeSynchronization, \ - CrocusSnowDepthDifference, CrocusSnowDepthAtMaxofSwe -from experiment.trend_analysis.abstract_score import MannKendall from experiment.meteo_france_data.scm_models_data.abstract_study import AbstractStudy from experiment.meteo_france_data.scm_models_data.crocus.crocus import CrocusDepth, CrocusSweTotal, ExtendedCrocusDepth, \ ExtendedCrocusSweTotal, CrocusDaysWithSnowOnGround, CrocusSwe3Days, CrocusSnowLoad3Days, CrocusSnowLoadTotal, \ CrocusSnowLoadEurocode, CrocusSnowLoad5Days, CrocusSnowLoad7Days +from experiment.meteo_france_data.scm_models_data.crocus.crocus_variables import CrocusDensityVariable from experiment.meteo_france_data.scm_models_data.safran.safran import SafranSnowfall, ExtendedSafranSnowfall, \ SafranRainfall, \ SafranTemperature, SafranPrecipitation - -from collections import OrderedDict - -from experiment.trend_analysis.univariate_test.extreme_trend_test.trend_test_one_parameter.gev_trend_test_one_parameter import \ - GevLocationTrendTest -from spatio_temporal_dataset.coordinates.transformed_coordinates.transformation.uniform_normalization import \ - BetweenZeroAndOneNormalization, BetweenMinusOneAndOneNormalization +from experiment.meteo_france_data.scm_models_data.visualization.study_visualizer import \ + StudyVisualizer +from projects.exceeding_snow_loads.discussion_data_comparison_with_eurocode.crocus_study_comparison_with_eurocode import \ + CrocusDifferenceSnowLoad, \ + CrocusSnowDensityAtMaxofSwe, CrocusDifferenceSnowLoadRescaledAndEurocodeToSeeSynchronization, \ + CrocusSnowDepthDifference, CrocusSnowDepthAtMaxofSwe from root_utils import get_display_name_from_object_type +from spatio_temporal_dataset.coordinates.transformed_coordinates.transformation.uniform_normalization import \ + BetweenZeroAndOneNormalization snow_density_str = '$\\rho_{SNOW}$' eurocode_snow_density = '{}=150 {}'.format(snow_density_str, CrocusDensityVariable.UNIT) @@ -154,27 +149,6 @@ def annual_mean_vizu_compare_durand_study(safran=True, take_mean_value=True, alt study_visualizer.visualize_annual_mean_values(take_mean_value=take_mean_value) -def max_stable_process_vizu_compare_gaume_study(altitude=1800, nb_days=1): - study = SafranSnowfall(altitude=altitude, nb_consecutive_days=nb_days) - study_visualizer = StudyVisualizer(study) - study_visualizer.visualize_brown_resnick_fit() - - -def normal_visualization(temporal_non_stationarity=False): - save_to_file = False - only_first_one = True - # for study_class in SCM_STUDIES[:1]: - for study_class in [CrocusDepth, SafranSnowfall, SafranRainfall, SafranTemperature][1:2]: - for study in study_iterator(study_class, only_first_one=only_first_one, altitudes=[300]): - study_visualizer = StudyVisualizer(study, save_to_file=save_to_file, - temporal_non_stationarity=temporal_non_stationarity) - print(study_visualizer.massif_name_to_scores) - # study_visualizer.visualize_all_mean_and_max_graphs() - - # study_visualizer.visualize_independent_margin_fits(threshold=[None, 20, 40, 60][0]) - # study_visualizer.visualize_annual_mean_values() - - def all_normal_vizu(): for study in study_iterator_global(study_classes=ALL_STUDIES, only_first_one=False, altitudes=ALL_ALTITUDES): study_visualizer = StudyVisualizer(study, save_to_file=True, temporal_non_stationarity=True) @@ -193,25 +167,6 @@ def case_study(): print(y) -def scores_vizu(): - save_to_file = False - only_first_one = True - for study in study_iterator_global(study_classes=ALL_STUDIES, only_first_one=only_first_one, altitudes=[1800]): - study_visualizer = StudyVisualizer(study, save_to_file=save_to_file, temporal_non_stationarity=True) - # study_visualizer.visualize_all_score_wrt_starting_year() - study_visualizer.visualize_all_score_wrt_starting_year() - - -def all_scores_vizu(): - save_to_file = True - only_first_one = False - for study in study_iterator_global(study_classes=[SafranSnowfall], only_first_one=only_first_one, - altitudes=ALL_ALTITUDES): - study_visualizer = StudyVisualizer(study, save_to_file=save_to_file, temporal_non_stationarity=True, - verbose=True) - # study_visualizer.visualize_all_mean_and_max_graphs() - study_visualizer.visualize_all_score_wrt_starting_year() - def complete_analysis(only_first_one=False): """An overview of everything that is possible with study OR extended study""" @@ -228,29 +183,6 @@ def complete_analysis(only_first_one=False): study_visualizer.visualize_linear_margin_fit() -def trend_analysis(): - save_to_file = True - only_first_one = False - short_altitudes = [300, 1200, 2100, 3000][:1] - full_altitude_with_at_least_2_stations = [0, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000, 3300, 3600, - 3900, 4200][:] - durand_altitude = [1800] - altitudes = durand_altitude - normalization_class = [None, BetweenMinusOneAndOneNormalization, BetweenZeroAndOneNormalization][-1] - study_classes = [CrocusSweTotal, CrocusDepth, SafranSnowfall, SafranRainfall, SafranTemperature][2:3] - for study in study_iterator_global(study_classes, only_first_one=only_first_one, altitudes=altitudes): - study_visualizer = StudyVisualizer(study, save_to_file=save_to_file, - transformation_class=normalization_class, - temporal_non_stationarity=True, - verbose=True, - multiprocessing=True, - complete_non_stationary_trend_analysis=True) - # study_visualizer.visualize_all_independent_temporal_trend() - # study_visualizer.visualize_temporal_trend_relevance() - study_visualizer.df_trend_spatio_temporal(GevLocationTrendTest, - starting_years=[1958, 1980], nb_massif_for_fast_mode=2) - - def maxima_analysis(): save_to_file = False only_first_one = True @@ -264,8 +196,7 @@ def maxima_analysis(): temporal_non_stationarity=True, verbose=True, multiprocessing=True, - complete_non_stationary_trend_analysis=True, - score_class=MannKendall) + complete_non_stationary_trend_analysis=True) # study_visualizer.visualize_all_score_wrt_starting_year() # study_visualizer.visualize_all_independent_temporal_trend() # study_visualizer.visualize_independent_margin_fits() @@ -300,23 +231,8 @@ def altitude_analysis(): def main_run(): - # normal_visualization(temporal_non_stationarity=True) - # trend_analysis() - - # altitude_analysis() max_graph_annual_maxima_poster() - # maxima_analysis() - # case_study() - # all_scores_vizu() - # maxima_analysis() - # all_normal_vizu() - - # annual_mean_vizu_compare_durand_study(safran=True, take_mean_value=True, altitude=2100) - - # max_stable_process_vizu_compare_gaume_study(altitude=1800, nb_days=1) - # extended_visualization() - # complete_analysis() - # scores_vizu() + if __name__ == '__main__': diff --git a/experiment/meteo_france_data/scm_models_data/visualization/study_visualizer.py b/experiment/meteo_france_data/scm_models_data/visualization/study_visualizer.py index 3ee9fae7191b02523c3700c585b7291766dd05e6..866156accbdd77a5141e0b7807b6ac992d7cccc4 100644 --- a/experiment/meteo_france_data/scm_models_data/visualization/study_visualizer.py +++ b/experiment/meteo_france_data/scm_models_data/visualization/study_visualizer.py @@ -15,12 +15,8 @@ import seaborn as sns from extreme_fit.model.result_from_model_fit.result_from_extremes.eurocode_return_level_uncertainties import \ EurocodeConfidenceIntervalFromExtremes, compute_eurocode_confidence_interval from experiment.meteo_france_data.scm_models_data.abstract_extended_study import AbstractExtendedStudy -from experiment.trend_analysis.abstract_score import MeanScore, AbstractTrendScore from experiment.meteo_france_data.scm_models_data.abstract_study import AbstractStudy -from experiment.trend_analysis.non_stationary_trends import \ - ConditionalIndedendenceLocationTrendTest, MaxStableLocationTrendTest, IndependenceLocationTrendTest from experiment.meteo_france_data.scm_models_data.visualization.utils import create_adjusted_axes -from experiment.trend_analysis.univariate_test.univariate_test_results import compute_gev_change_point_test_results from extreme_fit.distribution.abstract_params import AbstractParams from extreme_fit.estimator.full_estimator.abstract_full_estimator import \ FullEstimatorInASingleStepWithSmoothMargin @@ -75,8 +71,7 @@ class StudyVisualizer(VisualizationParameters): def __init__(self, study: AbstractStudy, show=True, save_to_file=False, only_one_graph=False, only_first_row=False, vertical_kde_plot=False, year_for_kde_plot=None, plot_block_maxima_quantiles=False, temporal_non_stationarity=False, transformation_class=None, verbose=False, multiprocessing=False, - complete_non_stationary_trend_analysis=False, normalization_under_one_observations=True, - score_class=MeanScore): + complete_non_stationary_trend_analysis=False, normalization_under_one_observations=True): super().__init__(save_to_file, only_one_graph, only_first_row, show) self.nb_cores = 7 self.massif_id_to_smooth_maxima = {} @@ -107,8 +102,6 @@ class StudyVisualizer(VisualizationParameters): self.window_size_for_smoothing = 1 # other value could be self.number_of_top_values = 10 # 1 if we just want the maxima - self.score_class = score_class - self.score = self.score_class(self.number_of_top_values) # type: AbstractTrendScore # Modify some class attributes # Remove some assert @@ -207,50 +200,6 @@ class StudyVisualizer(VisualizationParameters): ax = axes[row_id, column_id] visualize_function(ax, massif_id) - # TEMPORAL TREND - - def visualize_all_independent_temporal_trend(self): - massifs_ids = [self.study.study_massif_names.index(name) for name in self.specified_massif_names_median_scores] - self.visualize_massif_graphs(self.visualize_independent_temporal_trend, specified_massif_ids=massifs_ids) - self.plot_name = ' Independent temporal trend \n' - self.show_or_save_to_file() - - def visualize_independent_temporal_trend(self, ax, massif_id): - assert self.temporal_non_stationarity - # Create a dataset with temporal coordinates from the massif id - dataset_massif_id = AbstractDataset(self.observation_massif_id(massif_id), self.temporal_coordinates) - trend_test = IndependenceLocationTrendTest(station_name=self.study.study_massif_names[massif_id], - dataset=dataset_massif_id, verbose=self.verbose, - multiprocessing=self.multiprocessing) - trend_test.visualize(ax, self.complete_non_stationary_trend_analysis) - - def visualize_temporal_trend_relevance(self): - assert self.temporal_non_stationarity - trend_tests = [ConditionalIndedendenceLocationTrendTest(dataset=self.dataset, verbose=self.verbose, - multiprocessing=self.multiprocessing)] - - max_stable_models = load_test_max_stable_models(default_covariance_function=self.default_covariance_function) - for max_stable_model in [max_stable_models[1], max_stable_models[-2]]: - trend_tests.append(MaxStableLocationTrendTest(dataset=self.dataset, - max_stable_model=max_stable_model, verbose=self.verbose, - multiprocessing=self.multiprocessing)) - - nb_trend_tests = len(trend_tests) - fig, axes = plt.subplots(1, nb_trend_tests, figsize=self.figsize) - if nb_trend_tests == 1: - axes = [axes] - fig.subplots_adjust(hspace=self.subplot_space, wspace=self.subplot_space) - for ax, trend_test in zip(axes, trend_tests): - trend_test.visualize(ax, complete_analysis=self.complete_non_stationary_trend_analysis) - - plot_name = 'trend tests' - plot_name += ' with {} applied spatially & temporally'.format( - get_display_name_from_object_type(self.transformation_class)) - if self.normalization_under_one_observations: - plot_name += '(and maxima <= 1)' - self.plot_name = plot_name - self.show_or_save_to_file() - # EXPERIMENTAL LAW def visualize_all_experimental_law(self): @@ -371,200 +320,6 @@ class StudyVisualizer(VisualizationParameters): zip([massif_name for _, massif_name in massif_ids_and_names], altitudes_and_res)) return massif_name_to_eurocode_return_level_uncertainty - # def dep_class_to_eurocode_level_uncertainty(self, model_class, last_year_for_the_data): - # """ Take the max with respect to the posterior mean for each departement """ - # massif_name_to_eurocode_level_uncertainty = self.massif_name_to_eurocode_level_uncertainty(model_class, - # last_year_for_the_data) - # dep_class_to_eurocode_level_uncertainty = {} - # for dep_class, massif_names in dep_class_to_massif_names.items(): - # # Filter the couple of interest - # couples = [(k, v) for k, v in massif_name_to_eurocode_level_uncertainty.items() if k in massif_names] - # assert len(couples) > 0 - # massif_name, eurocode_level_uncertainty = max(couples, key=lambda c: c[1].posterior_mean) - # dep_class_to_eurocode_level_uncertainty[dep_class] = eurocode_level_uncertainty - # return dep_class_to_eurocode_level_uncertainty - - @cached_property - def massif_name_to_detailed_scores(self): - """ - This score respect the following property. - Between two successive score, then if the starting year was neither a top10 maxima nor a top10 minima, - then the score will not change - - The following case for instance gives a constant score wrt to the starting year - because all the maxima and all the minima are at the end - smooth_maxima = [0 for _ in years] - smooth_maxima[-20:-10] = [i for i in range(10)] - smooth_maxima[-10:] = [-i for i in range(10)] - :return: - """ - # Ordered massif by scores - massif_name_to_scores = {} - for massif_id, massif_name in enumerate(self.study.study_massif_names): - years, smooth_maxima = self.smooth_maxima_x_y(massif_id) - detailed_scores = [] - for j, starting_year in enumerate(self.starting_years): - detailed_scores.append(self.score.get_detailed_score(years, smooth_maxima)) - assert years[0] == starting_year, "{} {}".format(years[0], starting_year) - # Remove the first element from the list - years = years[1:] - smooth_maxima = smooth_maxima[1:] - massif_name_to_scores[massif_name] = np.array(detailed_scores) - return massif_name_to_scores - - def massif_name_to_gev_change_point_test_results(self, trend_test_class_for_change_point_test, - starting_years_for_change_point_test, - nb_massif_for_change_point_test=None, - sample_one_massif_from_each_region=True): - if self.trend_test_class_for_change_point_test is None: - # Set the attribute is not already done - self.trend_test_class_for_change_point_test = trend_test_class_for_change_point_test - self.starting_years_for_change_point_test = starting_years_for_change_point_test - self.nb_massif_for_change_point_test = nb_massif_for_change_point_test - self.sample_one_massif_from_each_region = sample_one_massif_from_each_region - else: - # Check that the argument are the same - assert self.trend_test_class_for_change_point_test == trend_test_class_for_change_point_test - assert self.starting_years == starting_years_for_change_point_test - assert self.nb_massif_for_change_point_test == nb_massif_for_change_point_test - assert self.sample_one_massif_from_each_region == sample_one_massif_from_each_region - - return self._massif_name_to_gev_change_point_test_results - - @cached_property - def _massif_name_to_gev_change_point_test_results(self): - massif_name_to_gev_change_point_test_results = {} - if self.nb_massif_for_change_point_test is None: - massif_names = self.study.study_massif_names - else: - # Set the random seed to the same number so that - print('Setting the random seed to ensure similar sampling in the fast mode') - seed(42) - if self.sample_one_massif_from_each_region: - # Get one massif from each region to ensure that the fast plot will not crash - assert self.nb_massif_for_change_point_test >= 4, 'we need at least one massif from each region' - massif_names = [AbstractExtendedStudy.region_name_to_massif_names[r][0] - for r in AbstractExtendedStudy.real_region_names] - massif_names_for_sampling = list(set(self.study.study_massif_names) - set(massif_names)) - nb_massif_for_sampling = self.nb_massif_for_change_point_test - len( - AbstractExtendedStudy.real_region_names) - massif_names += sample(massif_names_for_sampling, k=nb_massif_for_sampling) - else: - massif_names = sample(self.study.study_massif_names, k=self.nb_massif_for_change_point_test) - - for massif_id, massif_name in enumerate(massif_names): - years, smooth_maxima = self.smooth_maxima_x_y(massif_id) - gev_change_point_test_results = compute_gev_change_point_test_results(self.multiprocessing, smooth_maxima, - self.starting_years_for_change_point_test, - self.trend_test_class_for_change_point_test, - years) - massif_name_to_gev_change_point_test_results[massif_name] = gev_change_point_test_results - return massif_name_to_gev_change_point_test_results - - def df_trend_spatio_temporal(self, trend_test_class_for_change_point_test, - starting_years_for_change_point_test, - nb_massif_for_change_point_test=None, - sample_one_massif_from_each_region=True): - """ - Index are the massif - Columns are the starting year - - :param trend_test_class: - :param starting_year_to_weight: - :return: - """ - massif_name_to_trend_res = {} - massif_name_to_gev_change_point_test_results = self.massif_name_to_gev_change_point_test_results( - trend_test_class_for_change_point_test, - starting_years_for_change_point_test, - nb_massif_for_change_point_test, - sample_one_massif_from_each_region) - for massif_name, gev_change_point_test_results in massif_name_to_gev_change_point_test_results.items(): - trend_test_res, best_idxs = gev_change_point_test_results - trend_test_res = [(a, b, c, d, e, f) if i in best_idxs else (np.nan, np.nan, c, np.nan, np.nan, np.nan) - for i, (a, b, c, d, e, f, *_) in enumerate(trend_test_res)] - massif_name_to_trend_res[massif_name] = list(zip(*trend_test_res)) - nb_res = len(list(massif_name_to_trend_res.values())[0]) - assert nb_res == 6 - - all_massif_name_to_res = [{k: v[idx_res] for k, v in massif_name_to_trend_res.items()} - for idx_res in range(nb_res)] - return [pd.DataFrame(massif_name_to_res, index=self.starting_years_for_change_point_test).transpose() - for massif_name_to_res in all_massif_name_to_res] - - @cached_property - def massif_name_to_scores(self): - return {k: v[:, 0] for k, v in self.massif_name_to_detailed_scores.items()} - - @cached_property - def massif_name_to_first_detailed_score(self): - return {k: v[0] for k, v in self.massif_name_to_detailed_scores.items()} - - @cached_property - def massif_name_to_first_score(self): - return {k: v[0] for k, v in self.massif_name_to_scores.items()} - - @property - def specified_massif_names_median_scores(self): - return sorted(self.study.study_massif_names, key=lambda s: np.median(self.massif_name_to_scores[s])) - - @property - def specified_massif_names_first_score(self): - return sorted(self.study.study_massif_names, key=lambda s: self.massif_name_to_scores[s][0]) - - def visualize_all_score_wrt_starting_year(self): - specified_massif_names = self.specified_massif_names_median_scores - # Add one graph at the end - specified_massif_names += [None] - self.visualize_massif_graphs(self.visualize_score_wrt_starting_year, - specified_massif_ids=specified_massif_names) - plot_name = '' - plot_name += '{} top values for each score, abscisse represents starting year for a trend'.format( - self.number_of_top_values) - self.plot_name = plot_name - self.show_or_save_to_file() - - def visualize_score_wrt_starting_year(self, ax, massif_name): - if massif_name is None: - percentage, title = self.percentages_of_negative_trends() - scores = percentage - ax.set_ylabel('% of negative trends') - # Add two lines of interest - years_of_interest = [1963, 1976] - colors = ['g', 'r'] - for year_interest, color in zip(years_of_interest, colors): - ax.axvline(x=year_interest, color=color) - year_score = scores[self.starting_years.index(year_interest)] - ax.axhline(y=year_score, color=color) - else: - ax.set_ylabel(get_display_name_from_object_type(self.score)) - scores = self.massif_name_to_scores[massif_name] - title = massif_name - ax.plot(self.starting_years, scores) - ax.set_title(title) - ax.xaxis.set_ticks(self.starting_years[2::20]) - - def percentages_of_negative_trends(self): - print('start computing percentages negative trends') - # scores = np.median([np.array(v) < 0 for v in self.massif_name_to_scores.values()], axis=0) - # Take the mean with respect to the massifs - # We obtain an array whose length equal the length of starting years - scores = np.mean([np.array(v) < 0 for v in self.massif_name_to_scores.values()], axis=0) - percentages = 100 * scores - # First argmin, first argmax - argmin, argmax = np.argmin(scores), np.argmax(scores) - # Last argmin, last argmax - # argmin, argmax = len(scores) - 1 - np.argmin(scores[::-1]), len(scores) - 1 - np.argmax(scores[::-1]) - top_starting_year_for_positive_trend = self.starting_years[argmin] - top_starting_year_for_negative_trend = self.starting_years[argmax] - top_percentage_positive_trend = round(100 - percentages[argmin], 0) - top_percentage_negative_trend = round(percentages[argmax], 0) - title = "Global trend; > 0: {}% in {}; < 0: {}% in {}".format(top_percentage_positive_trend, - top_starting_year_for_positive_trend, - top_percentage_negative_trend, - top_starting_year_for_negative_trend) - - return percentages, title def visualize_all_mean_and_max_graphs(self): specified_massif_ids = [self.study.study_massif_names.index(massif_name) diff --git a/experiment/meteo_france_data/stations_data/comparisons_visualization.py b/experiment/meteo_france_data/stations_data/comparisons_visualization.py index d04fcc20aad339d9467b683650ac6fb9f854691f..6a50ad295ad6593a32cd86a0be008f39c7c1251c 100644 --- a/experiment/meteo_france_data/stations_data/comparisons_visualization.py +++ b/experiment/meteo_france_data/stations_data/comparisons_visualization.py @@ -14,8 +14,6 @@ from experiment.meteo_france_data.scm_models_data.visualization.study_visualizer VisualizationParameters from experiment.meteo_france_data.stations_data.comparison_analysis import ComparisonAnalysis, MASSIF_COLUMN_NAME, \ REANALYSE_STR, ALTITUDE_COLUMN_NAME, STATION_COLUMN_NAME -from experiment.trend_analysis.univariate_test.abstract_univariate_test import AbstractUnivariateTest -from experiment.trend_analysis.univariate_test.univariate_test_results import compute_gev_change_point_test_results from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates path = r'/data/meteo_france_data/stations_data/csv' @@ -114,10 +112,6 @@ class ComparisonsVisualization(VisualizationParameters): df.to_csv(csv_filepath) if TREND_TYPE_CNAME in df.columns: - # Display the confusion matrix - m = confusion_matrix(y_true=df[TREND_TYPE_CNAME].values, - y_pred=df[SAFRAN_TREND_TYPE_CNAME].values, - labels=AbstractUnivariateTest.three_main_trend_types()) # Display the classification score per massif df[WELL_CLASSIFIED_CNAME] = df[TREND_TYPE_CNAME] == df[SAFRAN_TREND_TYPE_CNAME] @@ -226,25 +220,6 @@ class ComparisonsVisualization(VisualizationParameters): def plot_maxima(self, ax, ax2, years, maxima, label, plot_color): ordered_dict = OrderedDict() - if self.keep_only_station_without_nan_values: - # Run trend test to improve the label - starting_years = years[:-4] - trend_test_res, best_idxs = compute_gev_change_point_test_results(multiprocessing=True, - maxima=maxima, - starting_years=starting_years, - trend_test_class=self.trend_test_class, - years=years) - best_idx = best_idxs[0] - most_likely_year = years[best_idx] - most_likely_trend_type = trend_test_res[best_idx][0] - display_trend_type = AbstractUnivariateTest.get_display_trend_type(real_trend_type=most_likely_trend_type) - label += "\n {} starting in {}".format(display_trend_type, most_likely_year) - ordered_dict[TREND_TYPE_CNAME] = display_trend_type - ordered_dict['most likely year'] = most_likely_year - # Display the deviance against the starting year - step = 1 - ax2.plot(starting_years[::step], [t[4] for t in trend_test_res][::step], color=plot_color, marker='o') - ax2.plot(starting_years[::step], [t[5] for t in trend_test_res][::step], color=plot_color, marker='x') # Plot maxima ax.grid() ax.plot(years, maxima, label=label, color=plot_color) diff --git a/experiment/trend_analysis/abstract_score.py b/experiment/trend_analysis/abstract_score.py deleted file mode 100644 index d5506b51e605fa0cff15d889d4f032265f1575a0..0000000000000000000000000000000000000000 --- a/experiment/trend_analysis/abstract_score.py +++ /dev/null @@ -1,71 +0,0 @@ -import numpy as np - - -class AbstractTrendScore(object): - """A score that should be equal to zero is there is no trend - positive if we suppose a positive trend - negative if we suppose a negative trend - - We don't care what happen before the change point. - All we want to focus on, is the potential trend that could exist in the data after a potential change point""" - - def __init__(self, number_of_top_values=None) -> None: - self.number_of_top_values = number_of_top_values - - def get_detailed_score(self, years_after_change_point, maxima_after_change_point): - raise NotImplementedError - - -class MannKendall(AbstractTrendScore): - # see here for the explanation: https://up-rs-esp.github.io/mkt/ - - def get_detailed_score(self, years_after_change_point, maxima_after_change_point): - score = 0.0 - for i, xi in enumerate(maxima_after_change_point[:-1]): - for xj in maxima_after_change_point[i + 1:]: - score += np.sign(xj - xi) - return [score, score, score] - - -class SortedScore(AbstractTrendScore): - - def __init__(self, number_of_top_values=None) -> None: - super().__init__(number_of_top_values) - assert self.number_of_top_values is not None - - def get_detailed_score(self, years_after_change_point, maxima_after_change_point): - # Get sorted years and sorted maxima - sorted_years, sorted_maxima = zip( - *sorted(zip(years_after_change_point, maxima_after_change_point), key=lambda s: s[1])) - sorted_years, sorted_maxima = list(sorted_years), np.array(sorted_maxima) - year_top_score_max = self.year_from_top_score(sorted_years[-self.number_of_top_values:], - sorted_maxima[-self.number_of_top_values:], top_max=True) - year_top_score_min = self.year_from_top_score(sorted_years[:self.number_of_top_values], - sorted_maxima[:self.number_of_top_values], top_max=False) - score_difference = year_top_score_max - year_top_score_min - return [score_difference, year_top_score_max, year_top_score_min] - - def year_from_top_score(self, top_sorted_years, top_sorted_maxima, top_max=None): - raise NotImplementedError - - -class MeanScore(SortedScore): - - def year_from_top_score(self, top_sorted_years, top_sorted_maxima, top_max=None): - return np.mean(top_sorted_years) - - -class MedianScore(SortedScore): - - def year_from_top_score(self, top_sorted_years, top_sorted_maxima, top_max=None): - return np.median(top_sorted_years) - - -class WeigthedScore(SortedScore): - - def year_from_top_score(self, top_sorted_years, top_sorted_maxima, top_max=None): - assert isinstance(top_max, bool) - if not top_max: - top_sorted_maxima = np.sum(top_sorted_maxima) - top_sorted_maxima - weights = top_sorted_maxima / np.sum(top_sorted_maxima) - return np.sum(weights * top_sorted_years) diff --git a/experiment/trend_analysis/non_stationary_trends.py b/experiment/trend_analysis/non_stationary_trends.py deleted file mode 100644 index cf1a8c8ef715556f71f81ab0820be70d4b6d2459..0000000000000000000000000000000000000000 --- a/experiment/trend_analysis/non_stationary_trends.py +++ /dev/null @@ -1,218 +0,0 @@ -import time -from multiprocessing import Pool -from typing import Union - -import pandas as pd - -from extreme_fit.estimator.abstract_estimator import AbstractEstimator -from scipy.stats import chi2 -from extreme_fit.estimator.full_estimator.abstract_full_estimator import \ - FullEstimatorInASingleStepWithSmoothMargin, AbstractFullEstimator -from extreme_fit.estimator.margin_estimator.abstract_margin_estimator import LinearMarginEstimator, \ - AbstractMarginEstimator -from extreme_fit.model.margin_model.linear_margin_model.linear_margin_model import \ - LinearStationaryMarginModel, LinearNonStationaryLocationMarginModel -from extreme_fit.model.margin_model.linear_margin_model.temporal_linear_margin_models import \ - StationaryTemporalModel, NonStationaryLocationTemporalModel -from extreme_fit.function.margin_function.linear_margin_function import LinearMarginFunction -from extreme_fit.model.utils import OptimizationConstants -from spatio_temporal_dataset.dataset.abstract_dataset import AbstractDataset -from root_utils import get_display_name_from_object_type - - -class AbstractNonStationaryTrendTest(object): - RESULT_ATTRIBUTE_METRIC = 'deviance' - - def __init__(self, dataset: AbstractDataset, estimator_class, - stationary_margin_model_class, non_stationary_margin_model_class, - verbose=False, - multiprocessing=False): - self.verbose = verbose - self.dataset = dataset - self.estimator_class = estimator_class - self.stationary_margin_model_class = stationary_margin_model_class - self.non_stationary_margin_model_class = non_stationary_margin_model_class - # Compute a dictionary that maps couple (margin model class, starting point) - # to the corresponding fitted estimator - self._starting_point_to_estimator = {} - # parallelization arguments - self.multiprocessing = multiprocessing - self.nb_cores = 7 - - def get_estimator(self, starting_point): - if starting_point not in self._starting_point_to_estimator: - estimator = self.load_estimator(starting_point) - self._starting_point_to_estimator[starting_point] = estimator - return self._starting_point_to_estimator[starting_point] - - def load_estimator(self, starting_point) -> Union[AbstractFullEstimator, AbstractMarginEstimator]: - margin_model_class = self.stationary_margin_model_class if starting_point is None else self.non_stationary_margin_model_class - assert starting_point not in self._starting_point_to_estimator - margin_model = margin_model_class(coordinates=self.dataset.coordinates, starting_point=starting_point) - estimator = self._load_estimator(margin_model) - start = time.time() - estimator.fit() - duration = time.time() - start - if self.verbose: - estimator_name = get_display_name_from_object_type(estimator) - margin_model_name = get_display_name_from_object_type(margin_model) - text = 'Fittig {} with margin: {} for starting_point={}\n'.format(estimator_name, - margin_model_name, - starting_point) - text += 'Fit took {}s and was {}'.format(round(duration, 1), estimator.result_from_model_fit.convergence) - print(text) - return estimator - - def _load_estimator(self, margin_model) -> Union[AbstractFullEstimator, AbstractMarginEstimator]: - return self.estimator_class(self.dataset, margin_model) - - def get_metric(self, starting_point): - estimator = self.get_estimator(starting_point) - metric = estimator.result_from_model_fit.__getattribute__(self.RESULT_ATTRIBUTE_METRIC) - assert isinstance(metric, float) - return metric - - def get_mu_coefs(self, starting_point): - # for the non stationary model gives the mu1 parameters that was fitted - estimator = self.get_estimator(starting_point) - margin_function = estimator.function_from_fit # type: LinearMarginFunction - assert isinstance(margin_function, LinearMarginFunction) - mu_coefs = [margin_function.mu_intercept, margin_function.mu1_temporal_trend] - if self.has_spatial_coordinates: - mu_coefs += [margin_function.mu_longitude_trend, margin_function.mu_latitude_trend] - return dict(zip(self.mu_coef_names, mu_coefs)) - - @property - def mu_coef_names(self): - mu_coef_names = ['mu_intercept', 'mu_temporal'] - if self.has_spatial_coordinates: - mu_coef_names += ['mu_longitude', 'mu_latitude'] - return mu_coef_names - - @property - def has_spatial_coordinates(self): - return self.dataset.coordinates.has_spatial_coordinates - - @property - def mu_coef_colors(self): - return ['b', 'c', 'g', 'y', ] - - def visualize(self, ax, complete_analysis=True): - years = self.years(complete_analysis) - - # Load the estimator only once - if self.multiprocessing: - with Pool(self.nb_cores) as p: - stationary_estimator, *non_stationary_estimators = p.map(self.get_estimator, [None] + years) - else: - stationary_estimator = self.get_estimator(None) - non_stationary_estimators = [self.get_estimator(year) for year in years] - self._starting_point_to_estimator[None] = stationary_estimator - for year, non_stationary_estimator in zip(years, non_stationary_estimators): - self._starting_point_to_estimator[year] = non_stationary_estimator - - # Plot differences - stationary_metric, *non_stationary_metrics = [self.get_metric(starting_point) for starting_point in - [None] + years] - difference = [m - stationary_metric for m in non_stationary_metrics] - color_difference = 'r' - label_difference = self.RESULT_ATTRIBUTE_METRIC + ' difference' - ax.plot(years, difference, color_difference + 'x-', label=label_difference) - ax.set_ylabel(label_difference, color=color_difference, ) - - # Plot zero line - # years_line = [years[0] -10, years[-1] + 10] - # ax.plot(years, [0 for _ in years], 'kx-', label='zero line') - # Plot significative line corresponding to 0.05 relevance - alpha = 0.05 - significative_deviance = chi2.ppf(q=1 - alpha, df=1) - ax.plot(years, [significative_deviance for _ in years], 'mx-', label='significative line') - - all_deviance_data = [significative_deviance] + difference - min_deviance_data, max_deviance_data = min(all_deviance_data), max(all_deviance_data) - - # Plot the mu1 parameter - mu_trends = [self.get_mu_coefs(starting_point=year) for year in years] - mus = {mu_coef_name: [mu_trend[mu_coef_name] for mu_trend in mu_trends] for mu_coef_name in self.mu_coef_names} - - ax2 = ax.twinx() - - for j, (mu_coef_name, mu_coef_values) in enumerate(mus.items()): - color_mu_coef = self.mu_coef_colors[j] - if self.verbose: - print(mu_coef_name, mu_coef_values) - ax2.plot(years, mu_coef_values, color_mu_coef + 'o-', label=mu_coef_name) - # ax2.set_ylabel(mu_coef_name, color=color_mu_coef) - - df_mus = pd.DataFrame(mus) - min_mus, max_mus = df_mus.min().min(), df_mus.max().max() - min_global, max_global = min(min_deviance_data, min_mus), max(max_deviance_data, max_mus) - # ax2.set_ylim(min_global, max_global) - # if min_mus < 0.0 < max_mus: - # align_yaxis_on_zero(ax2, ax) - - ax.set_title(self.display_name) - ax.set_xlabel('starting year for the linear trend of {}'.format(self.mu_coef_names[-1])) - ax.grid() - - prop = {'size': 5} if not self.has_spatial_coordinates else None - ax.legend(loc=6, prop=prop) - ax2.legend(loc=7, prop=prop) - - def years(self, complete_analysis=True): - # Define the year_min and year_max for the starting point - if complete_analysis: - year_min, year_max, step = 1960, 1990, 1 - OptimizationConstants.USE_MAXIT = True - else: - year_min, year_max, step = 1960, 1990, 5 - years = list(range(year_min, year_max + 1, step)) - return years - - @property - def display_name(self): - raise NotImplementedError - - -class IndependenceLocationTrendTest(AbstractNonStationaryTrendTest): - - def __init__(self, station_name, *args, **kwargs): - super().__init__(*args, **kwargs, - estimator_class=LinearMarginEstimator, - stationary_margin_model_class=StationaryTemporalModel, - non_stationary_margin_model_class=NonStationaryLocationTemporalModel) - self.station_name = station_name - - @property - def display_name(self): - return self.station_name - - -class ConditionalIndedendenceLocationTrendTest(AbstractNonStationaryTrendTest): - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs, - estimator_class=LinearMarginEstimator, - stationary_margin_model_class=LinearStationaryMarginModel, - non_stationary_margin_model_class=LinearNonStationaryLocationMarginModel) - - @property - def display_name(self): - return 'conditional independence' - - -class MaxStableLocationTrendTest(AbstractNonStationaryTrendTest): - - def __init__(self, max_stable_model, *args, **kwargs): - super().__init__(*args, **kwargs, - estimator_class=FullEstimatorInASingleStepWithSmoothMargin, - stationary_margin_model_class=LinearStationaryMarginModel, - non_stationary_margin_model_class=LinearNonStationaryLocationMarginModel) - self.max_stable_model = max_stable_model - - def _load_estimator(self, margin_model) -> AbstractEstimator: - return self.estimator_class(self.dataset, margin_model, self.max_stable_model) - - @property - def display_name(self): - return get_display_name_from_object_type(type(self.max_stable_model)) diff --git a/experiment/trend_analysis/univariate_test/abstract_univariate_test.py b/experiment/trend_analysis/univariate_test/abstract_univariate_test.py deleted file mode 100644 index f96d590704f2ea03f0eb695bce7312da60dcdc55..0000000000000000000000000000000000000000 --- a/experiment/trend_analysis/univariate_test/abstract_univariate_test.py +++ /dev/null @@ -1,88 +0,0 @@ -import random -from collections import OrderedDict - -import matplotlib.pyplot as plt -from cached_property import cached_property -from matplotlib import colors - - -class AbstractUnivariateTest(object): - SIGNIFICATIVE = 'significative' - # 5 possible types of trends - NO_TREND = 'no trend' - ALL_TREND = 'all trend' - POSITIVE_TREND = 'positive trend' - NEGATIVE_TREND = 'negative trend' - SIGNIFICATIVE_ALL_TREND = SIGNIFICATIVE + ' ' + ALL_TREND - SIGNIFICATIVE_POSITIVE_TREND = SIGNIFICATIVE + ' ' + POSITIVE_TREND - SIGNIFICATIVE_NEGATIVE_TREND = SIGNIFICATIVE + ' ' + NEGATIVE_TREND - NON_SIGNIFICATIVE_TREND = 'non ' + SIGNIFICATIVE + ' trend' - - # this is the most common level of significance - SIGNIFICANCE_LEVEL = 0.05 - - def __init__(self, years, maxima, starting_year): - self.years = years - self.maxima = maxima - self.starting_year = starting_year - assert len(self.years) == len(self.maxima) - - @cached_property - def idx_for_starting_year(self): - return self.years.index(self.starting_year) - - @property - def years_after_starting_year(self): - return self.years[self.idx_for_starting_year:] - - @property - def maxima_after_starting_year(self): - return self.maxima[self.idx_for_starting_year:] - - @classmethod - def real_trend_types(cls): - return [cls.POSITIVE_TREND, cls.NEGATIVE_TREND, - cls.SIGNIFICATIVE_POSITIVE_TREND, cls.SIGNIFICATIVE_NEGATIVE_TREND, cls.NO_TREND] - - @classmethod - def three_main_trend_types(cls): - return [cls.SIGNIFICATIVE_NEGATIVE_TREND, cls.NON_SIGNIFICATIVE_TREND, cls.SIGNIFICATIVE_POSITIVE_TREND] - - - @classmethod - def get_display_trend_type(cls, real_trend_type): - if cls.SIGNIFICATIVE in real_trend_type: - return real_trend_type - else: - return cls.NON_SIGNIFICATIVE_TREND - - - @property - def time_derivative_of_return_level(self): - return 0.0 - - @property - def test_trend_type(self) -> str: - test_sign = self.test_sign - assert test_sign in [-1, 0, 1] - if test_sign == 0: - trend_type = self.NO_TREND - else: - trend_type = self.POSITIVE_TREND if test_sign > 0 else self.NEGATIVE_TREND - if self.is_significant: - trend_type = self.SIGNIFICATIVE + ' ' + trend_type - assert trend_type in self.real_trend_types(), trend_type - return trend_type - - @property - def test_sign(self) -> int: - raise NotImplementedError - - @property - def is_significant(self) -> bool: - raise NotImplementedError - - - - - diff --git a/experiment/trend_analysis/univariate_test/extreme_trend_test/abstract_gev_trend_test.py b/experiment/trend_analysis/univariate_test/extreme_trend_test/abstract_gev_trend_test.py index c518996a81d103a48239a31fb60d0243ae6c5a1d..7baee1ace27950078e764f51f0520e25e48f93e6 100644 --- a/experiment/trend_analysis/univariate_test/extreme_trend_test/abstract_gev_trend_test.py +++ b/experiment/trend_analysis/univariate_test/extreme_trend_test/abstract_gev_trend_test.py @@ -7,7 +7,6 @@ from scipy.stats import chi2 from experiment.eurocode_data.utils import EUROCODE_QUANTILE, YEAR_OF_INTEREST_FOR_RETURN_LEVEL from experiment.meteo_france_data.scm_models_data.crocus.crocus_variables import AbstractSnowLoadVariable -from experiment.trend_analysis.univariate_test.abstract_univariate_test import AbstractUnivariateTest from experiment.trend_analysis.univariate_test.utils import load_temporal_coordinates_and_dataset, \ fitted_linear_margin_estimator from extreme_fit.distribution.gev.gev_params import GevParams @@ -20,9 +19,10 @@ from root_utils import classproperty from spatio_temporal_dataset.coordinates.abstract_coordinates import AbstractCoordinates -class AbstractGevTrendTest(AbstractUnivariateTest): +class AbstractGevTrendTest(object): RRunTimeError_TREND = 'R RunTimeError trend' nb_years_for_quantile_evolution = 10 + SIGNIFICANCE_LEVEL = 0.05 def __init__(self, years, maxima, starting_year, unconstrained_model_class, constrained_model_class=StationaryTemporalModel, @@ -58,19 +58,6 @@ class AbstractGevTrendTest(AbstractUnivariateTest): except SafeRunException: self.crashed = True - # Type of trends - - @classmethod - def real_trend_types(cls): - return super().real_trend_types() + [cls.RRunTimeError_TREND] - - @property - def test_trend_type(self) -> str: - if self.crashed: - return self.RRunTimeError_TREND - else: - return super().test_trend_type - # Likelihood ratio test @property diff --git a/experiment/trend_analysis/univariate_test/univariate_test_results.py b/experiment/trend_analysis/univariate_test/univariate_test_results.py deleted file mode 100644 index a319c72e37b201c5b26d247029ad8cc066f567a8..0000000000000000000000000000000000000000 --- a/experiment/trend_analysis/univariate_test/univariate_test_results.py +++ /dev/null @@ -1,44 +0,0 @@ -from multiprocessing.pool import Pool - -import numpy as np - -from experiment.trend_analysis.univariate_test.extreme_trend_test.abstract_gev_trend_test import AbstractGevTrendTest -from extreme_fit.model.margin_model.linear_margin_model.abstract_temporal_linear_margin_model import \ - TemporalMarginFitMethod -from root_utils import NB_CORES - - -def compute_gev_change_point_test_result(smooth_maxima, starting_year, trend_test_class, years, fit_method=TemporalMarginFitMethod.is_mev_gev_fit): - trend_test = trend_test_class(years, smooth_maxima, starting_year) # type: AbstractGevTrendTest - assert isinstance(trend_test, AbstractGevTrendTest) - return trend_test.test_trend_type, \ - trend_test.time_derivative_of_return_level, \ - trend_test.unconstained_nllh, \ - trend_test.test_trend_constant_quantile, \ - trend_test.mean_difference_same_sign_as_slope_strenght, \ - trend_test.variance_difference_same_sign_as_slope_strenght, \ - trend_test.unconstrained_model_deviance, \ - trend_test.constrained_model_deviance - - -def compute_gev_change_point_test_results(multiprocessing, maxima, starting_years, trend_test_class, - years): - if multiprocessing: - list_args = [(maxima, starting_year, trend_test_class, years) for starting_year in - starting_years] - with Pool(NB_CORES) as p: - trend_test_res = p.starmap(compute_gev_change_point_test_result, list_args) - else: - trend_test_res = [ - compute_gev_change_point_test_result(maxima, starting_year, trend_test_class, years) - for starting_year in starting_years] - # Keep only the most likely starting year - # (i.e. the starting year that minimizes its negative log likelihood) - # (set all the other data to np.nan so that they will not be taken into account in mean function) - best_idx = list(np.argmin(trend_test_res, axis=0))[2] - # print(best_idx, trend_test_res) - best_idxs = [best_idx] - # todo: by doing a sorting on the deviance, I could get the nb_top_likelihood_values values - # best_idxs = list(np.argmax(trend_test_res, axis=0))[-nb_top_likelihood_values:] - - return trend_test_res, best_idxs diff --git a/projects/exceeding_snow_loads/study_visualizer_for_non_stationary_trends.py b/projects/exceeding_snow_loads/study_visualizer_for_non_stationary_trends.py index 63fa23fdf3ef71d786e3071b68e8613f2a2aba03..bd44ceb527a672b4e40aeb32531a84146d7097a6 100644 --- a/projects/exceeding_snow_loads/study_visualizer_for_non_stationary_trends.py +++ b/projects/exceeding_snow_loads/study_visualizer_for_non_stationary_trends.py @@ -18,7 +18,6 @@ from experiment.meteo_france_data.scm_models_data.visualization.study_visualizer from projects.exceeding_snow_loads.check_mcmc_convergence_for_return_levels.gelman_convergence_test import \ compute_gelman_convergence_value from projects.exceeding_snow_loads.paper_utils import ModelSubsetForUncertainty, NON_STATIONARY_TREND_TEST_PAPER -from experiment.trend_analysis.abstract_score import MeanScore from experiment.trend_analysis.univariate_test.extreme_trend_test.abstract_gev_trend_test import AbstractGevTrendTest from experiment.trend_analysis.univariate_test.extreme_trend_test.trend_test_one_parameter.gumbel_trend_test_one_parameter import \ GumbelLocationTrendTest, GevStationaryVersusGumbel, GumbelScaleTrendTest, GumbelVersusGumbel @@ -41,7 +40,6 @@ class StudyVisualizerForNonStationaryTrends(StudyVisualizer): vertical_kde_plot=False, year_for_kde_plot=None, plot_block_maxima_quantiles=False, temporal_non_stationarity=False, transformation_class=None, verbose=False, multiprocessing=False, complete_non_stationary_trend_analysis=False, normalization_under_one_observations=True, - score_class=MeanScore, uncertainty_methods=None, model_subsets_for_uncertainty=None, uncertainty_massif_names=None, @@ -56,7 +54,7 @@ class StudyVisualizerForNonStationaryTrends(StudyVisualizer): super().__init__(study, show, save_to_file, only_one_graph, only_first_row, vertical_kde_plot, year_for_kde_plot, plot_block_maxima_quantiles, temporal_non_stationarity, transformation_class, verbose, multiprocessing, complete_non_stationary_trend_analysis, - normalization_under_one_observations, score_class) + normalization_under_one_observations) # Add some attributes self.fit_only_time_series_with_ninety_percent_of_non_null_values = fit_only_time_series_with_ninety_percent_of_non_null_values self.fit_gev_only_on_non_null_maxima = fit_gev_only_on_non_null_maxima diff --git a/test/test_experiment/test_SCM_study.py b/test/test_experiment/test_SCM_study.py index c17c2e6ebe0c0456592d9b40dd48528941bf15fd..bed06ac851910949f77ca0c23dfef475ee010329 100644 --- a/test/test_experiment/test_SCM_study.py +++ b/test/test_experiment/test_SCM_study.py @@ -19,15 +19,6 @@ from root_utils import get_display_name_from_object_type class TestSCMAllStudy(unittest.TestCase): - def test_extended_run(self): - for study_class in [ExtendedSafranSnowfall]: - for study in study_iterator(study_class, only_first_one=True, verbose=False): - study_visualizer = StudyVisualizer(study, show=False, save_to_file=False, multiprocessing=True) - study_visualizer.df_trend_spatio_temporal(GevLocationTrendTest, [1959, 1960, 1961], - nb_massif_for_change_point_test=3, - sample_one_massif_from_each_region=False) - self.assertTrue(True) - def test_instantiate_studies(self): nb_sample = 2 for nb_days in sample(set(NB_DAYS), k=nb_sample): diff --git a/test/test_experiment/test_coordinate_sensitivity.py b/test/test_experiment/test_coordinate_sensitivity.py deleted file mode 100644 index d66c1713d80119bb07e83e4f4dfbdd118c5ed5d3..0000000000000000000000000000000000000000 --- a/test/test_experiment/test_coordinate_sensitivity.py +++ /dev/null @@ -1,51 +0,0 @@ -import unittest - -from experiment.meteo_france_data.scm_models_data.crocus.crocus import CrocusSweTotal -from experiment.meteo_france_data.scm_models_data.visualization.main_study_visualizer import \ - study_iterator_global -from experiment.meteo_france_data.scm_models_data.visualization.study_visualizer import \ - StudyVisualizer -from experiment.trend_analysis.non_stationary_trends import \ - ConditionalIndedendenceLocationTrendTest -from spatio_temporal_dataset.coordinates.transformed_coordinates.transformation.uniform_normalization import \ - BetweenZeroAndOneNormalization, BetweenZeroAndOneNormalizationMinEpsilon, BetweenZeroAndOneNormalizationMaxEpsilon -from root_utils import get_display_name_from_object_type - - -class TestCoordinateSensitivity(unittest.TestCase): - DISPLAY = False - - def test_coordinate_normalization_sensitivity(self): - altitudes = [300, 600, 900, 1200, 2100, 3000][-1:] - transformation_classes = [None, BetweenZeroAndOneNormalization, BetweenZeroAndOneNormalizationMinEpsilon, - BetweenZeroAndOneNormalizationMaxEpsilon][1:2] - - study_classes = [CrocusSweTotal] - for study in study_iterator_global(study_classes, altitudes=altitudes, verbose=False): - if self.DISPLAY: - print(study.altitude) - for transformation_class in transformation_classes: - study_visualizer = StudyVisualizer(study, transformation_class=transformation_class) - study_visualizer.temporal_non_stationarity = True - trend_test = ConditionalIndedendenceLocationTrendTest(study_visualizer.dataset) - years = [1960, 1990] - mu1s = [trend_test.get_mu_coefs(year)['mu_temporal'] for year in years] - if self.DISPLAY: - print('Stationary') - stationary_est = trend_test.get_estimator(starting_point=None) - print(stationary_est.function_from_fit.coordinates.df_all_coordinates) - print(stationary_est.result_from_model_fit.convergence) - print(stationary_est.function_from_fit.coef_dict) - print('Non Stationary') - non_stationary_est = trend_test.get_estimator(starting_point=1960) - print(non_stationary_est.result_from_model_fit.convergence) - non_stationary_est = trend_test.get_estimator(starting_point=1990) - print(non_stationary_est.result_from_model_fit.convergence) - print(non_stationary_est.function_from_fit.coef_dict) - print(get_display_name_from_object_type(transformation_class), 'mu1s: ', mu1s) - print('\n') - self.assertTrue(0.0 not in mu1s) - - -if __name__ == '__main__': - unittest.main()