import glob import pickle import numpy as np import pandas as pd from OBIA.OBIABase import * from sklearn.model_selection import GroupShuffleSplit from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import confusion_matrix, accuracy_score, cohen_kappa_score, precision_recall_fscore_support from Learning.SampleManagement import gen_k_folds, generate_samples_from_set import warnings class ObjectBasedClassifier: def __init__(self, object_layer, time_series_list, user_feature_list, reference_data=None, ref_class_field='class', ref_id_field='id'): self.obia_base = OBIABase(object_layer, ref_data=reference_data, ref_class_field=ref_class_field, ref_id_field=ref_id_field) for lst in time_series_list: self.obia_base.add_raster_time_series_for_stats(lst) for ras in user_feature_list: self.obia_base.add_raster_for_stats(ras) if reference_data is not None: self.obia_base.populate_ref_db() self.training_base = self.obia_base.get_reference_db_as_training_base(class_field=ref_class_field) self.training_base['folds'] = [] self.training_base['dummy_ids'] = [] return def gen_k_folds(self, k, class_field='class', n_retries=10, augment=False, min_samples_per_class=None): ok = False retry = 0 while (not ok) and retry<n_retries: self.training_base['folds'], ok, problematic = gen_k_folds(self.training_base['X'], self.training_base[class_field], self.training_base['groups'], k) retry += 1 if not ok: if not augment: raise Exception("Not all classes are present in each fold/split.\n" "Please check that you have enough polygons (e.g. 2 x n_folds) per class.") else: if min_samples_per_class is None: min_samples_per_class = 2*k n_samples_to_add = [min_samples_per_class - len(np.unique(self.training_base['groups'][self.training_base[class_field]==c])) for c in problematic] warnings.warn('Classes {} have not enough groups to ensure sample presence in each fold. Augmenting to {} samples.'.format(problematic, min_samples_per_class)) curr_grp = np.max(self.training_base['groups']) + 1 curr_id = np.max(self.training_base['obj_id']) + 1 for c,n in zip(problematic, n_samples_to_add): x = self.training_base['X'][self.training_base[class_field]==c] s = generate_samples_from_set(x,n,0.01) sc = c * np.ones(n) sg = curr_grp + np.arange(n) sid = curr_id + np.arange(n) self.training_base['X'] = np.vstack([self.training_base['X'], s]) self.training_base[class_field] = np.concatenate([self.training_base[class_field], sc]) self.training_base['groups'] = np.concatenate([self.training_base['groups'], sg]) self.training_base['obj_id'] = np.concatenate([self.training_base['obj_id'], sid]) self.training_base['dummy_ids'] = np.concatenate([self.training_base['dummy_ids'], sid]) curr_grp += n curr_id += n retry = 0 while (not ok) and retry<n_retries: self.training_base['folds'], ok, problematic = gen_k_folds(self.training_base['X'], self.training_base[class_field], self.training_base['groups'], k) retry += 1 if not ok: raise Exception("Still not ok after augmentation. Please provide more samples.") return # To change! def gen_hold_out(self, test_train_ratio=0.5, n_splits=1, class_field='class'): gss = GroupShuffleSplit(n_splits=n_splits, test_size=test_train_ratio) for tr_i, ts_i in gss.split(self.training_base['X'], self.training_base[class_field], self.training_base['groups']): self.training_base['folds'].append((tr_i, ts_i)) # check if all classes are in all splits n_classes = len(np.unique(self.training_base[class_field])) ok = True for f in self.training_base['folds']: ok &= (len(np.unique(self.training_base[class_field][f[0]])) == n_classes and len(np.unique(self.training_base[class_field][f[1]])) == n_classes) if not ok: self.training_base['folds'] = [] raise Exception("Not all classes are present in each split.\n" "Please check that you have enough groups (e.g. 2 x (1/test_train_ratio)) per class.") def train_RF(self, n_estimators, class_field='class', return_true_vs_pred=False): assert('folds' in self.training_base.keys()) models = [] results = [] truelabs = np.array([]) for tr_i, ts_i in tqdm(self.training_base['folds'], desc='Training'): models.append(RandomForestClassifier(n_estimators=n_estimators)) models[-1].fit(self.training_base['X'][tr_i], self.training_base[class_field][tr_i]) l, c = self.training_base['obj_id'][ts_i], models[-1].predict(self.training_base['X'][ts_i]) # Remove dummy ids and relative class label (can lead to no samples in test set!) c = np.delete(c, np.isin(l, self.training_base['dummy_ids'])) l = np.delete(l, np.isin(l, self.training_base['dummy_ids'])) y_true, y_pred = self.obia_base.true_pred_bypixel(l, c, class_field) truelabs = np.unique(np.concatenate((truelabs,y_true,y_pred))) results.append( { 'conf_matrix': confusion_matrix(y_true, y_pred, labels=np.unique(self.training_base[class_field])), 'accuracy': accuracy_score(y_true, y_pred), 'kappa' : cohen_kappa_score(y_true, y_pred), 'p_r_f1': precision_recall_fscore_support(y_true, y_pred, zero_division=0, labels=np.unique(self.training_base[class_field])), 'importances' : models[-1].feature_importances_ } ) if return_true_vs_pred: results[-1]['true_vs_pred'] = (y_true, y_pred) all_imp = np.vstack([x['importances'] for x in results]) summary = { 'accuracy_mean': np.mean([x['accuracy'] for x in results]), 'accuracy_std': np.std([x['accuracy'] for x in results]), 'kappa_mean': np.mean([x['kappa'] for x in results]), 'kappa_std': np.std([x['kappa'] for x in results]), 'prec_mean': np.mean([x['p_r_f1'][0] for x in results], axis=0), 'prec_std': np.std([x['p_r_f1'][0] for x in results], axis=0), 'rec_mean': np.mean([x['p_r_f1'][1] for x in results], axis=0), 'rec_std': np.std([x['p_r_f1'][1] for x in results], axis=0), 'f1_mean': np.mean([x['p_r_f1'][2] for x in results], axis=0), 'f1_std': np.std([x['p_r_f1'][2] for x in results], axis=0), 'importance_mean': {k:v for k, v in zip(self.obia_base.get_vars(), np.mean(all_imp, axis=0))}, 'importance_std': {k:v for k, v in zip(self.obia_base.get_vars(), np.std(all_imp, axis=0))}, 'actual_labels': list(truelabs) } return models, summary, results def classify(self, models, perc=None, output_files=None, compress='NONE'): if isinstance(output_files, str): models, output_files = [models], [output_files] prg = tqdm(desc='Classification', total=len(self.obia_base.tiles) * len(models)) for t, L, X in self.obia_base.tiled_data(normalize=perc): for model, output_file in zip(models, output_files): if isinstance(model, list): prob = [] for m in model: prob.append(m.predict_proba(X)) prob = np.prod(prob, axis=0) c = model[0].classes_[np.argmax(prob, axis=1)] self.obia_base.populate_map(t, L, c, output_file, compress) else: c = model.predict(X) self.obia_base.populate_map(t, L, c, output_file, compress) prg.update(1) return def save_training_base(self, fn): with open(fn, 'wb') as f: pickle.dump(self.training_base, f) return #TEST CODE def run_test(sample_folder): lst1 = '{}/output/S2_processed/T31PDL/*/*FEAT.tif'.format(sample_folder) obc = ObjectBasedClassifier('{}/output/segmentation/segmentation.tif'.format(sample_folder), '{}/input/REF/ref_l2.shp'.format(sample_folder), [lst1], ['{}/input/THR/THR_SPOT6.tif'.format(sample_folder)], ref_class_field=['class', 'Class_L1a']) obc.gen_k_folds(5, class_field='class') m, s, r = obc.train_RF(100, return_true_vs_pred=True) obc.classify(m, '{}/output/classification/firstmap_l1.tif'.format(sample_folder)) d = {'model':m, 'results':r, 'summary':s} import pickle with open('{}/output/test_out.pkl'.format(sample_folder), 'wb') as f: pickle.dump(d, f) from Postprocessing import Report of = Report.generate_report_figures( '{}/output/classification/firstmap_l1.tif'.format(sample_folder), '{}/input/txt/palette_L0a.clr'.format(sample_folder), d['results'], d['summary'], '{}/output/reports'.format(sample_folder), 'Testou') with open('{}/output/test_out_figs.pkl'.format(sample_folder), 'wb') as f: pickle.dump(of, f) Report.generate_pdf(of, '{}/output/reports/firstmap_l1_report.pdf'.format(sample_folder), 'Testou') return m, s, r