Commit 409fce74 authored by Le Roux Erwan's avatar Le Roux Erwan
Browse files

[SCM][TREND TEST] add mann kendall test.

parent 1d98720b
No related merge requests found
Showing with 206 additions and 10 deletions
+206 -10
...@@ -9,7 +9,7 @@ class AbstractTrendScore(object): ...@@ -9,7 +9,7 @@ class AbstractTrendScore(object):
We don't care what happen before the change point. We don't care what happen before the change point.
All we want to focus on, is the potential trend that could exist in the data after a potential change point""" All we want to focus on, is the potential trend that could exist in the data after a potential change point"""
def __init__(self, number_of_top_values) -> None: def __init__(self, number_of_top_values=None) -> None:
self.number_of_top_values = number_of_top_values self.number_of_top_values = number_of_top_values
def get_detailed_score(self, years_after_change_point, maxima_after_change_point): def get_detailed_score(self, years_after_change_point, maxima_after_change_point):
...@@ -21,13 +21,17 @@ class MannKendall(AbstractTrendScore): ...@@ -21,13 +21,17 @@ class MannKendall(AbstractTrendScore):
def get_detailed_score(self, years_after_change_point, maxima_after_change_point): def get_detailed_score(self, years_after_change_point, maxima_after_change_point):
score = 0.0 score = 0.0
for i, xi in enumerate(maxima_after_change_point[:-1]): for i, xi in enumerate(maxima_after_change_point[:-1]):
for xj in maxima_after_change_point[i+1:]: for xj in maxima_after_change_point[i + 1:]:
score += np.sign(xj - xi) score += np.sign(xj - xi)
return [score, score, score] return [score, score, score]
class SortedScore(AbstractTrendScore): class SortedScore(AbstractTrendScore):
def __init__(self, number_of_top_values=None) -> None:
super().__init__(number_of_top_values)
assert self.number_of_top_values is not None
def get_detailed_score(self, years_after_change_point, maxima_after_change_point): def get_detailed_score(self, years_after_change_point, maxima_after_change_point):
# Get sorted years and sorted maxima # Get sorted years and sorted maxima
sorted_years, sorted_maxima = zip( sorted_years, sorted_maxima = zip(
......
import random import random
import numpy as np
from experiment.meteo_france_SCM_study.mann_kendall_test import mann_kendall_test
from experiment.meteo_france_SCM_study.abstract_score import MannKendall
class AbstractTrendTest(object): class AbstractTrendTest(object):
SIGNIFICATIVE = 'significative' SIGNIFICATIVE = 'significative'
...@@ -10,6 +15,8 @@ class AbstractTrendTest(object): ...@@ -10,6 +15,8 @@ class AbstractTrendTest(object):
SIGNIFICATIVE_POSITIVE_TREND = SIGNIFICATIVE + ' ' + POSITIVE_TREND SIGNIFICATIVE_POSITIVE_TREND = SIGNIFICATIVE + ' ' + POSITIVE_TREND
SIGNIFICATIVE_NEGATIVE_TREND = SIGNIFICATIVE + ' ' + NEGATIVE_TREND SIGNIFICATIVE_NEGATIVE_TREND = SIGNIFICATIVE + ' ' + NEGATIVE_TREND
SIGNIFICANCE_LEVEL = 0.05
# todo: maybe create ordered dict # todo: maybe create ordered dict
TREND_TYPE_TO_STYLE = { TREND_TYPE_TO_STYLE = {
NO_TREND: 'k--', NO_TREND: 'k--',
...@@ -24,6 +31,11 @@ class AbstractTrendTest(object): ...@@ -24,6 +31,11 @@ class AbstractTrendTest(object):
def __init__(self, years_after_change_point, maxima_after_change_point): def __init__(self, years_after_change_point, maxima_after_change_point):
self.years_after_change_point = years_after_change_point self.years_after_change_point = years_after_change_point
self.maxima_after_change_point = maxima_after_change_point self.maxima_after_change_point = maxima_after_change_point
assert len(self.years_after_change_point) == len(self.maxima_after_change_point)
@property
def n(self):
return len(self.years_after_change_point)
@property @property
def test_trend_type(self) -> str: def test_trend_type(self) -> str:
...@@ -47,7 +59,7 @@ class AbstractTrendTest(object): ...@@ -47,7 +59,7 @@ class AbstractTrendTest(object):
raise NotImplementedError raise NotImplementedError
class ExampleTrendTest(AbstractTrendTest): class ExampleRandomTrendTest(AbstractTrendTest):
@property @property
def test_sign(self) -> int: def test_sign(self) -> int:
...@@ -59,4 +71,27 @@ class ExampleTrendTest(AbstractTrendTest): ...@@ -59,4 +71,27 @@ class ExampleTrendTest(AbstractTrendTest):
class MannKendallTrendTest(AbstractTrendTest): class MannKendallTrendTest(AbstractTrendTest):
pass
def __init__(self, years_after_change_point, maxima_after_change_point):
super().__init__(years_after_change_point, maxima_after_change_point)
score = MannKendall()
# Compute score value
detailed_score = score.get_detailed_score(years_after_change_point, maxima_after_change_point)
self.score_value = detailed_score[0]
# Compute the Mann Kendall Test
MK, S = mann_kendall_test(t=years_after_change_point,
x=maxima_after_change_point,
eps=1e-5,
alpha=self.SIGNIFICANCE_LEVEL,
Ha='upordown')
assert S == self.score_value
self.MK = MK
@property
def test_sign(self) -> int:
return np.sign(self.score_value)
@property
def is_significant(self) -> bool:
assert 'reject' in self.MK or 'accept' in self.MK
return 'accept' in self.MK
# Created: Mon Apr 17, 2017 01:18PM
# Last modified: Mon Apr 17, 2017 09:24PM
# Copyright: Bedartha Goswami <goswami@uni-potsdam.de>
# This code was adapted from: https://up-rs-esp.github.io/mkt/
import numpy as np
from scipy.special import ndtri, ndtr
import sys
def mann_kendall_test(t, x, eps=None, alpha=None, Ha=None):
"""
Runs the Mann-Kendall test for trend in time series data.
Parameters
----------
t : 1D numpy.ndarray
array of the time points of measurements
x : 1D numpy.ndarray
array containing the measurements corresponding to entries of 't'
eps : scalar, float, greater than zero
least count error of measurements which help determine ties in the data
alpha : scalar, float, greater than zero
significance level of the statistical test (Type I error)
Ha : string, options include 'up', 'down', 'upordown'
type of test: one-sided ('up' or 'down') or two-sided ('updown')
Returns
-------
MK : string
result of the statistical test indicating whether or not to accept hte
alternative hypothesis 'Ha'
m : scalar, float
slope of the linear fit to the data
c : scalar, float
intercept of the linear fit to the data
p : scalar, float, greater than zero
p-value of the obtained Z-score statistic for the Mann-Kendall test
Raises
------
AssertionError : error
least count error of measurements 'eps' is not given
AssertionError : error
significance level of test 'alpha' is not given
AssertionError : error
alternative hypothesis 'Ha' is not given
"""
# assert a least count for the measurements x
assert eps, "Please provide least count error for measurements 'x'"
assert alpha, "Please provide significance level 'alpha' for the test"
assert Ha, "Please provide the alternative hypothesis 'Ha'"
# estimate sign of all possible (n(n-1)) / 2 differences
n = len(t)
sgn = np.zeros((n, n), dtype="int")
for i in range(n):
tmp = x - x[i]
tmp[np.where(np.fabs(tmp) <= eps)] = 0.
sgn[i] = np.sign(tmp)
# estimate mean of the sign of all possible differences
S = sgn[np.triu_indices(n, k=1)].sum()
# estimate variance of the sign of all possible differences
# 1. Determine no. of tie groups 'p' and no. of ties in each group 'q'
np.fill_diagonal(sgn, eps * 1E6)
i, j = np.where(sgn == 0.)
ties = np.unique(x[i])
p = len(ties)
q = np.zeros(len(ties), dtype="int")
for k in range(p):
idx = np.where(np.fabs(x - ties[k]) < eps)[0]
q[k] = len(idx)
# 2. Determine the two terms in the variance calculation
term1 = n * (n - 1) * (2 * n + 5)
term2 = (q * (q - 1) * (2 * q + 5)).sum()
# 3. estimate variance
varS = float(term1 - term2) / 18.
# Compute the Z-score based on above estimated mean and variance
if S > eps:
Zmk = (S - 1) / np.sqrt(varS)
elif np.fabs(S) <= eps:
Zmk = 0.
elif S < -eps:
Zmk = (S + 1) / np.sqrt(varS)
# compute test based on given 'alpha' and alternative hypothesis
# note: for all the following cases, the null hypothesis Ho is:
# Ho := there is no monotonic trend
#
# Ha := There is an upward monotonic trend
if Ha == "up":
Z_ = ndtri(1. - alpha)
if Zmk >= Z_:
MK = "accept Ha := upward trend"
else:
MK = "reject Ha := upward trend"
# Ha := There is a downward monotonic trend
elif Ha == "down":
Z_ = ndtri(1. - alpha)
if Zmk <= -Z_:
MK = "accept Ha := downward trend"
else:
MK = "reject Ha := downward trend"
# Ha := There is an upward OR downward monotonic trend
elif Ha == "upordown":
Z_ = ndtri(1. - alpha / 2.)
if np.fabs(Zmk) >= Z_:
MK = "accept Ha := upward OR downward trend"
else:
MK = "reject Ha := upward OR downward trend"
# ----------
# AS A BONUS
# ----------
# # estimate the slope and intercept of the line
# m = np.corrcoef(t, x)[0, 1] * (np.std(x) / np.std(t))
# c = np.mean(x) - m * np.mean(t)
#
#
# # ----------
# # AS A BONUS
# # ----------
# # estimate the p-value for the obtained Z-score Zmk
# if S > eps:
# if Ha == "up":
# p = 1. - ndtr(Zmk)
# elif Ha == "down":
# p = ndtr(Zmk)
# elif Ha == "upordown":
# p = 0.5 * (1. - ndtr(Zmk))
# elif np.fabs(S) <= eps:
# p = 0.5
# elif S < -eps:
# if Ha == "up":
# p = 1. - ndtr(Zmk)
# elif Ha == "down":
# p = ndtr(Zmk)
# elif Ha == "upordown":
# p = 0.5 * (ndtr(Zmk))
return MK, S
from experiment.meteo_france_SCM_study.abstract_score import MannKendall, WeigthedScore, MeanScore, MedianScore from experiment.meteo_france_SCM_study.abstract_score import MannKendall, WeigthedScore, MeanScore, MedianScore
from experiment.meteo_france_SCM_study.abstract_study import AbstractStudy from experiment.meteo_france_SCM_study.abstract_study import AbstractStudy
from experiment.meteo_france_SCM_study.abstract_trend_test import MannKendallTrendTest, ExampleTrendTest from experiment.meteo_france_SCM_study.abstract_trend_test import MannKendallTrendTest, ExampleRandomTrendTest
from experiment.meteo_france_SCM_study.crocus.crocus import CrocusDepth, CrocusSwe, ExtendedCrocusDepth, \ from experiment.meteo_france_SCM_study.crocus.crocus import CrocusDepth, CrocusSwe, ExtendedCrocusDepth, \
ExtendedCrocusSwe ExtendedCrocusSwe
from experiment.meteo_france_SCM_study.safran.safran import SafranSnowfall, ExtendedSafranSnowfall, \ from experiment.meteo_france_SCM_study.safran.safran import SafranSnowfall, ExtendedSafranSnowfall, \
...@@ -45,10 +45,10 @@ def altitude_trends_significant(): ...@@ -45,10 +45,10 @@ def altitude_trends_significant():
only_first_one = False only_first_one = False
# altitudes that have 20 massifs at least # altitudes that have 20 massifs at least
altitudes = ALL_ALTITUDES[3:-6] altitudes = ALL_ALTITUDES[3:-6]
altitudes = ALL_ALTITUDES[3:5] # altitudes = ALL_ALTITUDES[3:5]
# altitudes = ALL_ALTITUDES[:2] # altitudes = ALL_ALTITUDES[:2]
for study_class in SCM_STUDIES[:1]: for study_class in SCM_STUDIES[:1]:
trend_test_classes = [ExampleTrendTest, ExampleTrendTest, MannKendallTrendTest][:2] trend_test_classes = [MannKendallTrendTest][:]
visualizers = [StudyVisualizer(study, temporal_non_stationarity=True, verbose=False) visualizers = [StudyVisualizer(study, temporal_non_stationarity=True, verbose=False)
for study in study_iterator_global(study_classes=[study_class], only_first_one=only_first_one, for study in study_iterator_global(study_classes=[study_class], only_first_one=only_first_one,
altitudes=altitudes)] altitudes=altitudes)]
......
...@@ -225,5 +225,9 @@ class AltitudeVisualizer(object): ...@@ -225,5 +225,9 @@ class AltitudeVisualizer(object):
altitude_to_serie_with_mean_percentages[altitude] = s altitude_to_serie_with_mean_percentages[altitude] = s
# Plot lines # Plot lines
for trend_type, style in AbstractTrendTest.TREND_TYPE_TO_STYLE.items(): for trend_type, style in AbstractTrendTest.TREND_TYPE_TO_STYLE.items():
percentages = [v.loc[trend_type] for v in altitude_to_serie_with_mean_percentages.values()] percentages = [v.loc[trend_type] if trend_type in v.index else 0.0
ax.plot(self.altitudes, percentages, style + marker, label=trend_type) for v in altitude_to_serie_with_mean_percentages.values()]
if set(percentages) == {0.0}:
continue
else:
ax.plot(self.altitudes, percentages, style + marker, label=trend_type)
...@@ -388,7 +388,14 @@ class StudyVisualizer(object): ...@@ -388,7 +388,14 @@ class StudyVisualizer(object):
massif_name_to_trend_test_count = self.massif_name_to_trend_test_count(trend_test_class, starting_year_to_weight) massif_name_to_trend_test_count = self.massif_name_to_trend_test_count(trend_test_class, starting_year_to_weight)
df = pd.concat(list(massif_name_to_trend_test_count.values()), axis=1, sort=False) df = pd.concat(list(massif_name_to_trend_test_count.values()), axis=1, sort=False)
df.fillna(0.0, inplace=True) df.fillna(0.0, inplace=True)
return df.mean(axis=1) df = df.mean(axis=1)
assert np.allclose(df.sum(), 100)
# Add the significant trend into the count of normal trend
if AbstractTrendTest.SIGNIFICATIVE_POSITIVE_TREND in df.columns:
df[AbstractTrendTest.POSITIVE_TREND] += df[AbstractTrendTest.SIGNIFICATIVE_POSITIVE_TREND]
if AbstractTrendTest.SIGNIFICATIVE_NEGATIVE_TREND in df.columns:
df[AbstractTrendTest.NEGATIVE_TREND] += df[AbstractTrendTest.SIGNIFICATIVE_NEGATIVE_TREND]
return df
@cached_property @cached_property
def massif_name_to_scores(self): def massif_name_to_scores(self):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment