import glob import numpy as np import pandas as pd from OBIA.OBIABase import * from sklearn.model_selection import StratifiedGroupKFold, GroupShuffleSplit from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import confusion_matrix, accuracy_score, cohen_kappa_score, precision_recall_fscore_support class ObjectBasedClassifier: def __init__(self, object_layer, time_series_list, user_feature_list, reference_data=None, ref_class_field='class', ref_id_field='id'): self.obia_base = OBIABase(object_layer, ref_data=reference_data, ref_class_field=ref_class_field, ref_id_field=ref_id_field) for lst in time_series_list: self.obia_base.add_raster_time_series_for_stats(lst) for ras in user_feature_list: self.obia_base.add_raster_for_stats(ras) if reference_data is not None: self.obia_base.populate_ref_db() self.training_base = self.obia_base.get_reference_db_as_training_base(class_field=ref_class_field) self.training_base['folds'] = [] return def gen_k_folds(self, k, class_field='class'): sgk = StratifiedGroupKFold(n_splits=k, shuffle=True) for tr_i, ts_i in sgk.split(self.training_base['X'], self.training_base[class_field], self.training_base['groups']): self.training_base['folds'].append((tr_i, ts_i)) # check if all classes are in all splits n_classes = len(np.unique(self.training_base[class_field])) ok = True for f in self.training_base['folds']: ok &= (len(np.unique(self.training_base[class_field][f[0]])) == n_classes and len(np.unique(self.training_base[class_field][f[1]])) == n_classes) if not ok: self.training_base['folds'] = [] raise Exception("Not all classes are present in each fold/split.\n" "Please check that you have enough groups (e.g. 2 x n_folds) per class.") return def gen_hold_out(self, test_train_ratio=0.5, n_splits=1, class_field='class'): gss = GroupShuffleSplit(n_splits=n_splits, test_size=test_train_ratio) for tr_i, ts_i in gss.split(self.training_base['X'], self.training_base[class_field], self.training_base['groups']): self.training_base['folds'].append((tr_i, ts_i)) # check if all classes are in all splits n_classes = len(np.unique(self.training_base[class_field])) ok = True for f in self.training_base['folds']: ok &= (len(np.unique(self.training_base[class_field][f[0]])) == n_classes and len(np.unique(self.training_base[class_field][f[1]])) == n_classes) if not ok: self.training_base['folds'] = [] raise Exception("Not all classes are present in each split.\n" "Please check that you have enough groups (e.g. 2 x (1/test_train_ratio)) per class.") def train_RF(self, n_estimators, class_field='class', return_true_vs_pred=False): assert('folds' in self.training_base.keys()) models = [] results = [] yt_yp = [] for tr_i, ts_i in tqdm(self.training_base['folds'], desc='Training'): models.append(RandomForestClassifier(n_estimators=n_estimators)) models[-1].fit(self.training_base['X'][tr_i], self.training_base[class_field][tr_i]) l, c = self.training_base['obj_id'][ts_i], models[-1].predict(self.training_base['X'][ts_i]) y_true, y_pred = self.obia_base.true_pred_bypixel(l, c, class_field) results.append( { 'conf_matrix': confusion_matrix(y_true, y_pred), 'accuracy': accuracy_score(y_true, y_pred), 'kappa' : cohen_kappa_score(y_true, y_pred), 'p_r_f1': precision_recall_fscore_support(y_true, y_pred, zero_division=0), 'importances' : models[-1].feature_importances_ } ) if return_true_vs_pred: results[-1]['true_vs_pred'] = (y_true, y_pred) all_imp = np.vstack([x['importances'] for x in results]) summary = { 'accuracy_mean': np.mean([x['accuracy'] for x in results]), 'accuracy_std': np.std([x['accuracy'] for x in results]), 'kappa_mean': np.mean([x['kappa'] for x in results]), 'kappa_std': np.std([x['kappa'] for x in results]), 'prec_mean': np.mean([x['p_r_f1'][0] for x in results], axis=0), 'prec_std': np.std([x['p_r_f1'][0] for x in results], axis=0), 'rec_mean': np.mean([x['p_r_f1'][1] for x in results], axis=0), 'rec_std': np.std([x['p_r_f1'][1] for x in results], axis=0), 'f1_mean': np.mean([x['p_r_f1'][2] for x in results], axis=0), 'f1_std': np.std([x['p_r_f1'][2] for x in results], axis=0), 'importance_mean': {k:v for k, v in zip(self.obia_base.get_vars(), np.mean(all_imp, axis=0))}, 'importance_std': {k:v for k, v in zip(self.obia_base.get_vars(), np.std(all_imp, axis=0))} } return models, summary, results def classify(self, model, perc=None, output_file=None, compress='NONE'): prg = tqdm(desc='Classification', total=len(self.obia_base.tiles)) if isinstance(model, list): for t, L, X in self.obia_base.tiled_data(normalize=perc): prob = [] for m in model: prob.append(m.predict_proba(X)) prob = np.prod(prob, axis=0) c = model[0].classes_[np.argmax(prob, axis=1)] self.obia_base.populate_map(t, L, c, output_file, compress) prg.update(1) else: for t,L,X in self.obia_base.tiled_data(normalize=perc): c = model.predict(X) self.obia_base.populate_map(t, L, c, output_file, compress) prg.update(1) return #TEST CODE def run_test(sample_folder): lst1 = '{}/output/S2_processed/T31PDL/*/*FEAT.tif'.format(sample_folder) obc = ObjectBasedClassifier('{}/output/segmentation/segmentation.tif'.format(sample_folder), '{}/input/REF/ref_l2.shp'.format(sample_folder), [lst1], ['{}/input/THR/THR_SPOT6.tif'.format(sample_folder)], ref_class_field=['class', 'Class_L1a']) obc.gen_k_folds(5, class_field='class') m, s, r = obc.train_RF(100, return_true_vs_pred=True) obc.classify(m, '{}/output/classification/firstmap_l1.tif'.format(sample_folder)) d = {'model':m, 'results':r, 'summary':s} import pickle with open('{}/output/test_out.pkl'.format(sample_folder), 'wb') as f: pickle.dump(d, f) from Postprocessing import Report of = Report.generate_report_figures( '{}/output/classification/firstmap_l1.tif'.format(sample_folder), '{}/input/txt/palette_L0a.clr'.format(sample_folder), d['results'], d['summary'], '{}/output/reports'.format(sample_folder), 'Testou') with open('{}/output/test_out_figs.pkl'.format(sample_folder), 'wb') as f: pickle.dump(of, f) Report.generate_pdf(of, '{}/output/reports/firstmap_l1_report.pdf'.format(sample_folder), 'Testou') return m, s, r