ObjectBased.py 7.45 KiB
import glob

import numpy as np
import pandas as pd

from OBIA.OBIABase import *
from sklearn.model_selection import StratifiedGroupKFold, GroupShuffleSplit
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix, accuracy_score, cohen_kappa_score, precision_recall_fscore_support

class ObjectBasedClassifier:
    def __init__(self, object_layer, time_series_list, user_feature_list,
                 reference_data=None, ref_class_field='class', ref_id_field='id'):
        self.obia_base = OBIABase(object_layer, ref_data=reference_data, ref_class_field=ref_class_field,
                                  ref_id_field=ref_id_field)
        for lst in time_series_list:
            self.obia_base.add_raster_time_series_for_stats(lst)
        for ras in user_feature_list:
            self.obia_base.add_raster_for_stats(ras)
        if reference_data is not None:
            self.obia_base.populate_ref_db()
            self.training_base = self.obia_base.get_reference_db_as_training_base(class_field=ref_class_field)
            self.training_base['folds'] = []
        return

    def gen_k_folds(self, k, class_field='class'):
        sgk = StratifiedGroupKFold(n_splits=k, shuffle=True)
        for tr_i, ts_i in sgk.split(self.training_base['X'],
                                    self.training_base[class_field],
                                    self.training_base['groups']):
            self.training_base['folds'].append((tr_i, ts_i))
        # check if all classes are in all splits
        n_classes = len(np.unique(self.training_base[class_field]))
        ok = True
        for f in self.training_base['folds']:
            ok &= (len(np.unique(self.training_base[class_field][f[0]])) == n_classes and
                   len(np.unique(self.training_base[class_field][f[1]])) == n_classes)
        if not ok:
            self.training_base['folds'] = []
            raise Exception("Not all classes are present in each fold/split.\n"
                            "Please check that you have enough groups (e.g. 2 x n_folds) per class.")
        return

    def gen_hold_out(self, test_train_ratio=0.5, n_splits=1, class_field='class'):
        gss = GroupShuffleSplit(n_splits=n_splits, test_size=test_train_ratio)
        for tr_i, ts_i in gss.split(self.training_base['X'],
                                    self.training_base[class_field],
                                    self.training_base['groups']):
            self.training_base['folds'].append((tr_i, ts_i))
        # check if all classes are in all splits
        n_classes = len(np.unique(self.training_base[class_field]))
        ok = True
        for f in self.training_base['folds']:
            ok &= (len(np.unique(self.training_base[class_field][f[0]])) == n_classes and
                   len(np.unique(self.training_base[class_field][f[1]])) == n_classes)
        if not ok:
            self.training_base['folds'] = []
            raise Exception("Not all classes are present in each split.\n"
                            "Please check that you have enough groups (e.g. 2 x (1/test_train_ratio)) per class.")

    def train_RF(self, n_estimators, class_field='class', return_true_vs_pred=False):
        assert('folds' in self.training_base.keys())
        models = []
        results = []
        yt_yp = []
        for tr_i, ts_i in tqdm(self.training_base['folds'], desc='Training'):
            models.append(RandomForestClassifier(n_estimators=n_estimators))
            models[-1].fit(self.training_base['X'][tr_i], self.training_base[class_field][tr_i])
            l, c = self.training_base['obj_id'][ts_i], models[-1].predict(self.training_base['X'][ts_i])
            y_true, y_pred = self.obia_base.true_pred_bypixel(l, c, class_field)
            results.append(
                {
                    'conf_matrix': confusion_matrix(y_true, y_pred),
                    'accuracy': accuracy_score(y_true, y_pred),
                    'kappa' : cohen_kappa_score(y_true, y_pred),
                    'p_r_f1': precision_recall_fscore_support(y_true, y_pred, zero_division=0),
                    'importances' : models[-1].feature_importances_
                }
            )
            if return_true_vs_pred:
                results[-1]['true_vs_pred'] = (y_true, y_pred)

        all_imp = np.vstack([x['importances'] for x in results])
        summary = {
            'accuracy_mean': np.mean([x['accuracy'] for x in results]),
            'accuracy_std': np.std([x['accuracy'] for x in results]),
            'kappa_mean': np.mean([x['kappa'] for x in results]),
            'kappa_std': np.std([x['kappa'] for x in results]),
            'prec_mean': np.mean([x['p_r_f1'][0] for x in results], axis=0),
            'prec_std': np.std([x['p_r_f1'][0] for x in results], axis=0),
            'rec_mean': np.mean([x['p_r_f1'][1] for x in results], axis=0),
            'rec_std': np.std([x['p_r_f1'][1] for x in results], axis=0),
            'f1_mean': np.mean([x['p_r_f1'][2] for x in results], axis=0),
            'f1_std': np.std([x['p_r_f1'][2] for x in results], axis=0),
            'importance_mean': {k:v for k, v in zip(self.obia_base.get_vars(), np.mean(all_imp, axis=0))},
            'importance_std': {k:v for k, v in zip(self.obia_base.get_vars(), np.std(all_imp, axis=0))}
        }
        return models, summary, results

    def classify(self, model, perc=None, output_file=None, compress='NONE'):
        prg = tqdm(desc='Classification', total=len(self.obia_base.tiles))
        if isinstance(model, list):
            for t, L, X in self.obia_base.tiled_data(normalize=perc):
                prob = []
                for m in model:
                    prob.append(m.predict_proba(X))
                prob = np.prod(prob, axis=0)
                c = model[0].classes_[np.argmax(prob, axis=1)]
                self.obia_base.populate_map(t, L, c, output_file, compress)
                prg.update(1)
        else:
            for t,L,X in self.obia_base.tiled_data(normalize=perc):
                c = model.predict(X)
                self.obia_base.populate_map(t, L, c, output_file, compress)
                prg.update(1)
        return

#TEST CODE
def run_test(sample_folder):
    lst1 = '{}/output/S2_processed/T31PDL/*/*FEAT.tif'.format(sample_folder)
    obc = ObjectBasedClassifier('{}/output/segmentation/segmentation.tif'.format(sample_folder),
                                '{}/input/REF/ref_l2.shp'.format(sample_folder),
                                [lst1],
                                ['{}/input/THR/THR_SPOT6.tif'.format(sample_folder)],
                                ref_class_field=['class', 'Class_L1a'])

    obc.gen_k_folds(5, class_field='class')
    m, s, r = obc.train_RF(100, return_true_vs_pred=True)
    obc.classify(m, '{}/output/classification/firstmap_l1.tif'.format(sample_folder))

    d = {'model':m, 'results':r, 'summary':s}
    import pickle
    with open('{}/output/test_out.pkl'.format(sample_folder), 'wb') as f:
        pickle.dump(d, f)

    from Postprocessing import Report
    of = Report.generate_report_figures(
        '{}/output/classification/firstmap_l1.tif'.format(sample_folder),
        '{}/input/txt/palette_L0a.clr'.format(sample_folder), d['results'], d['summary'],
        '{}/output/reports'.format(sample_folder), 'Testou')

    with open('{}/output/test_out_figs.pkl'.format(sample_folder), 'wb') as f:
        pickle.dump(of, f)
    Report.generate_pdf(of, '{}/output/reports/firstmap_l1_report.pdf'.format(sample_folder),
                        'Testou')

    return m, s, r