abstract_study.py 21.3 KB
Newer Older
Le Roux Erwan's avatar
Le Roux Erwan committed
import datetime

Le Roux Erwan's avatar
Le Roux Erwan committed
from s2m.configuration import ALPS_ALLSLOPES_FOLDER, ALPS_FLAT_FOLDER, PYRENEES_FLAT_FOLDER, DATA_PATH, MAP_PATH
Le Roux Erwan's avatar
Le Roux Erwan committed
from utils import cached_property

import io
import os
import os.path as op
from collections import OrderedDict
from contextlib import redirect_stdout
from itertools import chain
from multiprocessing.pool import Pool
from typing import List, Dict, Tuple

import numpy as np
import pandas as pd
from netCDF4 import Dataset

from s2m.abstract_variable import AbstractVariable
from s2m.dataset.coordinates.abstract_coordinates import AbstractCoordinates
from s2m.dataset.observations.annual_maxima_observations import AnnualMaxima
from s2m.utils import Season, FrenchRegion, ALTITUDES, ORIENTATIONS, SLOPES, first_day_and_last_day, date_to_str, \
    ORDERED_ALLSLOPES_ALTITUDES, ORDERED_ALLSLOPES_ORIENTATIONS, ORDERED_ALLSLOPES_SLOPES, \
    ORDERED_ALLSLOPES_MASSIFNUM, ZS_INT_MASK, ZS_INT_MASK_PYRENNES, season_to_str, ZS_INT_MASK_PYRENNES_LIST, ZS_INT_23
from utils import NB_CORES, classproperty

f = io.StringIO()
with redirect_stdout(f):
    from simpledbf import Dbf5


class AbstractStudy(object):
    """
    A Study is defined by:
        - a variable class that correspond to the meteorogical quantity of interest
        - an altitude of interest
        - a start and a end year

    Les fichiers netcdf de SAFRAN et CROCUS sont autodocumentés (on peut les comprendre avec ncdump -h notamment).
    """

    def __init__(self, variable_class: type, altitude: int = 1800,
                 year_min=None, year_max=None,
                 multiprocessing=True, orientation=None, slope=20.0,
                 season=Season.annual,
                 french_region=FrenchRegion.alps,
                 split_years=None):
        assert isinstance(altitude, int), type(altitude)
        assert altitude in ALTITUDES, altitude
        self.french_region = french_region
        self.altitude = altitude
        self.model_name = None
        self.variable_class = variable_class
        # Add some attributes, for the "allslopes" reanalysis
        assert orientation is None or orientation in ORIENTATIONS
        assert slope in SLOPES
        self.orientation = orientation
        self.slope = slope

        if year_min is None:
            year_min = self.YEAR_MIN
        if year_max is None:
            year_max = self.YEAR_MAX
        assert self.YEAR_MIN <= year_min <= year_max <= self.YEAR_MAX
        self.year_min = year_min
        self.year_max = year_max
        self.multiprocessing = multiprocessing
        self.season = season
        if split_years is None:
            split_years = list(range(year_min, year_max + 1))
        self.split_years = set(split_years)

        # Add some cache for computation
        self._cache_for_pointwise_fit = {}
        self._massif_names_for_cache = None

    """ Time """

    @cached_property
    def year_to_first_index_and_last_index(self):
        year_to_first_index_and_last_index = OrderedDict()
        first_day, last_day = first_day_and_last_day(self.season)
        for year, all_days in self.year_to_all_days.items():
            year_first_index = year - 1 if self.season is not Season.spring else year
            year_last_index = year - 1 if self.season is Season.automn else year
            first_index = all_days.index('{}-{}'.format(year_first_index, first_day))
            last_index = all_days.index('{}-{}'.format(year_last_index, last_day))
            year_to_first_index_and_last_index[year] = (first_index, last_index)
        return year_to_first_index_and_last_index

    @cached_property
    def year_to_days(self) -> OrderedDict:
        year_to_days = OrderedDict()
        for year, (start_index, last_index) in self.year_to_first_index_and_last_index.items():
            year_to_days[year] = self.year_to_all_days[year][start_index:last_index + 1]
        return year_to_days

    @cached_property
    def year_to_all_days(self) -> OrderedDict:
        # Map each year to the 'days since year-08-01 06:00:00'
        year_to_days = OrderedDict()
        for year in self.ordered_years:
            # Load days for the full year
            date = datetime.datetime(year=year - 1, month=8, day=1, hour=6, minute=0, second=0)
            days = []
            for i in range(366):
                day = date_to_str(date)
                days.append(day)
                date += datetime.timedelta(days=1)
                if date.month == 8 and date.day == 1:
                    break
            year_to_days[year] = days
        return year_to_days

    @property
    def all_days(self):
        return list(chain(*list(self.year_to_days.values())))

    @property
    def all_daily_series(self) -> np.ndarray:
        """Return an array of approximate shape (total_number_of_days, 23) x """
        all_daily_series = np.concatenate([time_serie_array
                                           for time_serie_array in self.year_to_daily_time_serie_array.values()])
        assert len(all_daily_series) == len(self.all_days)
        return all_daily_series

    """ Annual maxima """

    @property
    def observations_annual_maxima(self) -> AnnualMaxima:
        return AnnualMaxima(df_maxima_gev=pd.DataFrame(self.year_to_annual_maxima, index=self.study_massif_names))

    @cached_property
    def observations_annual_mean(self) -> pd.DataFrame:
        return pd.DataFrame(self.year_to_annual_mean, index=self.study_massif_names)

    def annual_maxima_and_years(self, massif_name) -> Tuple[np.ndarray, np.ndarray]:
        df = self.observations_annual_maxima.df_maxima_gev
        return df.loc[massif_name].values, np.array(df.columns)

    @cached_property
    def year_to_annual_maxima(self) -> OrderedDict:
        # Map each year to an array of size nb_massif
        year_to_annual_maxima = OrderedDict()
        for year, time_serie in self._year_to_max_daily_time_serie.items():
            year_to_annual_maxima[year] = time_serie.max(axis=0)
        return year_to_annual_maxima

    @cached_property
    def year_to_annual_mean(self) -> OrderedDict:
        # Map each year to an array of size nb_massif
        year_to_annual_mean = OrderedDict()
        for year, time_serie in self._year_to_max_daily_time_serie.items():
            year_to_annual_mean[year] = time_serie.mean(axis=0)
        return year_to_annual_mean

    @cached_property
    def year_to_annual_maxima_index(self) -> OrderedDict:
        # Map each year to an array of size nb_massif
        year_to_annual_maxima = OrderedDict()
        for year, time_serie in self._year_to_max_daily_time_serie.items():
            year_to_annual_maxima[year] = time_serie.argmax(axis=0)
        return year_to_annual_maxima

    @cached_property
    def massif_name_to_annual_maxima_index(self):
        massif_name_to_annual_maxima_index = OrderedDict()
        for i, massif_name in enumerate(self.study_massif_names):
            index = [self.year_to_annual_maxima_index[year][i] for year in self.ordered_years]
            massif_name_to_annual_maxima_index[massif_name] = index
        return massif_name_to_annual_maxima_index

    @cached_property
    def massif_name_to_annual_maxima_angle(self):
        normalization_denominator = [366 if year % 4 == 0 else 365 for year in self.ordered_years]
        massif_name_to_annual_maxima_angle = OrderedDict()
        for massif_name, annual_maxima_index in self.massif_name_to_annual_maxima_index.items():
            angle = 2 * np.pi * np.array(annual_maxima_index) / np.array(normalization_denominator)
            massif_name_to_annual_maxima_angle[massif_name] = angle
        return massif_name_to_annual_maxima_angle

    @cached_property
    def massif_name_to_annual_maxima(self):
        massif_name_to_annual_maxima = OrderedDict()
        for i, massif_name in enumerate(self.study_massif_names):
            maxima = np.array([self.year_to_annual_maxima[year][i] for year in self.ordered_years])
            massif_name_to_annual_maxima[massif_name] = maxima
        return massif_name_to_annual_maxima

    @cached_property
    def massif_name_to_daily_time_series(self):
        massif_name_to_daily_time_series = OrderedDict()
        for i, massif_name in enumerate(self.study_massif_names):
            a = [self.year_to_daily_time_serie_array[year][:, i] for year in self.ordered_years]
            daily_time_series = np.array(list(chain.from_iterable(a)))
            massif_name_to_daily_time_series[massif_name] = daily_time_series
        return massif_name_to_daily_time_series

    @cached_property
    def massif_name_to_annual_maxima_ordered_years(self):
        massif_name_to_annual_maxima_ordered_years = OrderedDict()
        for massif_name in self.study_massif_names:
            maxima = self.massif_name_to_annual_maxima[massif_name]
            annual_maxima_ordered_index = np.argsort(maxima)
            annual_maxima_ordered_years = [self.ordered_years[idx] for idx in annual_maxima_ordered_index]
            massif_name_to_annual_maxima_ordered_years[massif_name] = annual_maxima_ordered_years
        return massif_name_to_annual_maxima_ordered_years

    """ Annual total """

    @property
    def df_annual_total(self) -> pd.DataFrame:
        return pd.DataFrame(self.year_to_annual_total, index=self.study_massif_names).transpose()

    def annual_aggregation_function(self, *args, **kwargs):
        raise NotImplementedError()

    @cached_property
    def year_to_annual_total(self) -> OrderedDict:
        # Map each year to an array of size nb_massif
        year_to_annual_mean = OrderedDict()
        for year, time_serie in self._year_to_daily_time_serie_array.items():
            year_to_annual_mean[year] = self.apply_annual_aggregation(time_serie)
        return year_to_annual_mean

    @cached_property
    def massif_name_to_annual_total(self):
        # Map each massif to an array of size nb_years
        massif_name_to_annual_total = OrderedDict()
        for i, massif_name in enumerate(self.study_massif_names):
            maxima = np.array([self.year_to_annual_total[year][i] for year in self.ordered_years])
            massif_name_to_annual_total[massif_name] = maxima
        return massif_name_to_annual_total

    def apply_annual_aggregation(self, time_serie):
        return self.annual_aggregation_function(time_serie, axis=0)

    """ Load daily observations """

    @cached_property
    def year_to_daily_time_serie_array(self) -> OrderedDict:
        return self._year_to_daily_time_serie_array

    @property
    def _year_to_max_daily_time_serie(self) -> OrderedDict:
        return self._year_to_daily_time_serie_array

    @property
    def _year_to_daily_time_serie_array(self) -> OrderedDict:
        # Map each year to a matrix of size 365-nb_days_consecutive+1 x nb_massifs
        year_to_daily_time_serie_array = OrderedDict()
        for year in self.ordered_years:
            # Check daily data
            daily_time_serie = self.daily_time_series(year)
            # Filter only the data corresponding:
            # 1: to treturnhe start_index and last_index of the season
            # 2: to the massifs for the altitude of interest
            assert daily_time_serie.shape == (len(self.year_to_days[year]), len(self.study_massif_names))
            year_to_daily_time_serie_array[year] = daily_time_serie
        return year_to_daily_time_serie_array

    def daily_time_series(self, year):
        daily_time_serie = self.year_to_variable_object[year].daily_time_serie_array
        nb_days = daily_time_serie.shape[0]
        assert nb_days == 365 or (nb_days == 366 and year % 4 == 0)
        assert daily_time_serie.shape[1] == len(self.column_mask)
        first_index, last_index = self.year_to_first_index_and_last_index[year]
        daily_time_serie = daily_time_serie[first_index:last_index + 1, self.column_mask]
        return daily_time_serie

    """ Load Variables and Datasets """

    @cached_property
    def year_to_variable_object(self) -> OrderedDict:
        # Map each year to the variable array
        path_files, ordered_years = self.ordered_years_and_path_files
        return self.efficient_variable_loading(ordered_years, path_files, multiprocessing=self.multiprocessing)

    def efficient_variable_loading(self, ordered_years, arguments, multiprocessing):
        if multiprocessing:
            with Pool(NB_CORES) as p:
                variables = p.map(self.load_variable_object, arguments)
        else:
            variables = [self.load_variable_object(argument) for argument in arguments]
        return OrderedDict(zip(ordered_years, variables))

    def instantiate_variable_object(self, variable_array) -> AbstractVariable:
        return self.variable_class(variable_array)

    def load_variable_array(self, dataset):
        return np.array(dataset.variables[self.load_keyword()])

    def load_variable_object(self, path_file):
        dataset = Dataset(path_file)
        variable_array = self.load_variable_array(dataset)
        return self.instantiate_variable_object(variable_array)

    def load_keyword(self):
        return self.variable_class.keyword()

    @property
    def year_to_dataset_ordered_dict(self) -> OrderedDict:
        print('This code is quite long... '
              'You should consider year_to_variable which is way faster when multiprocessing=True')
        # Map each year to the correspond netCDF4 Dataset
        path_files, ordered_years = self.ordered_years_and_path_files
        datasets = [Dataset(path_file) for path_file in path_files]
        return OrderedDict(zip(ordered_years, datasets))

    @cached_property
    def ordered_years_and_path_files(self):
        nc_files = [(int(f.split('_')[-2][:4]) + 1, f) for f in os.listdir(self.study_full_path) if f.endswith('.nc')]
        assert op.exists(self.study_full_path)
Le Roux Erwan's avatar
Le Roux Erwan committed
        assert len(nc_files) > 0
Le Roux Erwan's avatar
Le Roux Erwan committed
        ordered_years, path_files = zip(*[(year, op.join(self.study_full_path, nc_file))
                                          for year, nc_file in sorted(nc_files, key=lambda t: t[0])
                                          if (self.year_min <= year <= self.year_max)
                                          and (year in self.split_years)])
        return path_files, ordered_years

    """ Temporal properties """

    @property
    def nb_years(self):
        return len(self.ordered_years)

    @property
    def ordered_years(self):
        return self.ordered_years_and_path_files[1]

    @property
    def start_year_and_stop_year(self) -> Tuple[int, int]:
        ordered_years = self.ordered_years
        return ordered_years[0], ordered_years[-1]

    """ Spatial properties """

    @property
    def study_massif_names(self) -> List[str]:
        # Massif names that are present in the current study (i.e. for the current altitude)
        return self.altitude_to_massif_names[self.altitude]

    @property
    def df_latitude_longitude(self):
        any_ordered_dict = list(self.year_to_dataset_ordered_dict.values())[0]
        longitude = np.array(any_ordered_dict.variables['LON'])[self.flat_mask]
        latitude = np.array(any_ordered_dict.variables['LAT'])[self.flat_mask]
        data = [longitude, latitude]
        df = pd.DataFrame(data=data, index=['Longitude', 'Latitude'], columns=self.study_massif_names).transpose()
        return df

    @property
    def column_mask(self):
        return self.allslopes_mask if self.has_orientation else self.flat_mask

    @property
    def allslopes_mask(self):
        altitude_mask = np.array(ORDERED_ALLSLOPES_ALTITUDES) == self.altitude
        orientation_mask = np.array(ORDERED_ALLSLOPES_ORIENTATIONS) == self.orientation
        slope_mask = np.array(ORDERED_ALLSLOPES_SLOPES) == self.slope
        allslopes_mask = altitude_mask & orientation_mask & slope_mask
        # Exclude all the data corresponding to the 24th massif
        massif_24_mask = np.array(ORDERED_ALLSLOPES_MASSIFNUM) == 30
        return allslopes_mask & ~massif_24_mask

    @cached_property
    def flat_mask(self):
        if self.french_region is FrenchRegion.alps:
            altitude_mask = ZS_INT_MASK == self.altitude
        elif self.french_region is FrenchRegion.pyrenees:
            altitude_mask = ZS_INT_MASK_PYRENNES == self.altitude
        else:
            raise ValueError('{}'.format(self.french_region))
        assert np.sum(altitude_mask) == len(self.altitude_to_massif_names[self.altitude])
        return altitude_mask

    """ Path properties """

    @property
    def variable_name(self):
        return self.variable_class.NAME + ' ({})'.format(self.variable_unit)

    @property
    def variable_unit(self):
        return self.variable_class.UNIT


    """ 
    CLASS ATTRIBUTES COMMON TO ALL OBJECTS 
    (written as object attributes/methods for simplicity)
    """

    """ Path properties """

    @classproperty
    def data_path(self) -> str:
        return DATA_PATH

    @classproperty
    def map_full_path(self) -> str:
Le Roux Erwan's avatar
Le Roux Erwan committed
        return
Le Roux Erwan's avatar
Le Roux Erwan committed

    @property
    def study_full_path(self) -> str:
        assert self.model_name in ['Safran', 'Crocus']
        study_folder = 'meteo' if self.model_name is 'Safran' else 'pro'
        return op.join(self.reanalysis_path, study_folder)

    @property
    def reanalysis_path(self):
        reanalysis_folder = self.reanalysisfolder
        assert len(reanalysis_folder) > 0, 'please specify the name of reanalysis folder in configuration.py'
        return op.join(self.data_path, reanalysis_folder)

    @property
    def reanalysisfolder(self):
        if self.french_region is FrenchRegion.alps:
            if self.has_orientation:
                reanalysis_folder = ALPS_ALLSLOPES_FOLDER
            else:
                reanalysis_folder = ALPS_FLAT_FOLDER
        elif self.french_region is FrenchRegion.pyrenees and not self.has_orientation:
            reanalysis_folder = PYRENEES_FLAT_FOLDER
        else:
            raise ValueError(
                'french_region = {}, has_orientation = {}'.format(self.french_region, self.has_orientation))
        return reanalysis_folder

    @property
    def YEAR_MIN(self):
        return 1959

    @property
    def YEAR_MAX(self):
        nb_files_list = [len(os.listdir(op.join(self.reanalysis_path, model_name))) for model_name in ['meteo', 'pro']]
        nb_files_list = [n for n in nb_files_list if n > 0]
        assert len(nb_files_list) > 0, 'please download some files for {}'.format(self.reanalysisfolder)
        if len(nb_files_list) == 2:
            assert nb_files_list[0] == nb_files_list[1]
        return self.YEAR_MIN + nb_files_list[0]

    @property
    def dbf_filename(self) -> str:
        if self.french_region is FrenchRegion.alps:
            return 'massifs_alpes'
        elif self.french_region is FrenchRegion.pyrenees:
            return 'massifs_pyrenees'
        else:
            raise ValueError('{}'.format(self.french_region))

    @property
    def has_orientation(self):
        return self.orientation is not None

    @property
    def season_name(self):
        return season_to_str(self.season)

    """  Spatial properties """

    @cached_property
    def massif_name_to_massif_id(self):
        return {name: i for i, name in enumerate(self.study_massif_names)}

    @property
    def dbf_filename(self):
        if self.french_region is FrenchRegion.alps:
            return 'massifs_alpes'
        else:
            raise NotImplementedError

    @property
    def all_massif_names(self) -> List[str]:
        """
        Pour l'identification des massifs, le numéro de la variable massif_num correspond à celui de l'attribut num_opp
        """
        if ALPS_FLAT_FOLDER in self.reanalysis_path or ALPS_ALLSLOPES_FOLDER in self.reanalysis_path:
            french_region = FrenchRegion.alps
            key = 'num_opp'
        else:
            french_region = FrenchRegion.pyrenees
            key = 'massif_num'

        metadata_path = op.join(self.data_path, 'metadata')
        dbf = Dbf5(op.join(metadata_path, '{}.dbf'.format(self.dbf_filename)))
        df = dbf.to_dataframe().copy()  # type: pd.DataFrame
        dbf.f.close()
        # Important part (for the alps & pyrenees all data is order from the smaller massif number to the bigger)
        df.sort_values(by=key, inplace=True)
        all_massif_names = list(df['nom'])
        # Correct a massif name
        if french_region is FrenchRegion.alps:
            all_massif_names[all_massif_names.index('Beaufortin')] = 'Beaufortain'
        return all_massif_names

    @cached_property
    def massif_name_to_altitudes(self) -> Dict[str, List[int]]:
        zs = ZS_INT_23 if self.french_region is FrenchRegion.alps else ZS_INT_MASK_PYRENNES_LIST
        s = zs + [0]
        zs_list = []
        zs_all_list = []
        for a, b in zip(s[:-1], s[1:]):
            zs_list.append(a)
            if a > b:
                zs_all_list.append(zs_list)
                zs_list = []
        all_massif_names = self.all_massif_names
        return OrderedDict(zip(all_massif_names, zs_all_list))

    @cached_property
    def altitude_to_massif_names(self) -> Dict[int, List[str]]:
        altitude_to_massif_names = {altitude: [] for altitude in ALTITUDES}
        for massif_name in self.massif_name_to_altitudes.keys():
            for altitude in self.massif_name_to_altitudes[massif_name]:
                altitude_to_massif_names[altitude].append(massif_name)
        # massif_names are ordered in the same way as all_massif_names
        return altitude_to_massif_names

Le Roux Erwan's avatar
Le Roux Erwan committed
    @property
    def csv_file(self):
        if self.french_region is FrenchRegion.alps:
            return 'massifsalpes.csv'
        else:
            raise NotImplementedError