#!/usr/bin/python # -*- coding: utf-8 -*- import pandas as pd import geopandas as gpd import os, sys, time, shutil import glob import yaml try: from yaml import CLoader as Loader except ImportError: from yaml import Loader import numpy as np from indicators import Indicators from tqdm import tqdm import patutils from patutils import md5sum, load_pat_patches import seaborn as sns import matplotlib.pyplot as plt from scenarios import ScenariosStack, Reallocator import multiprocessing as mp from functools import partial class MCMC: def buildOutputDirName(config, suffix): dirname = config['output_dir'] + '/' + time.strftime('%Y%m%d-%H%M%S') dirname += '_{initial_nb_particles}_{step_nb_particles}_{ratio_patches_to_modify}'.format(**config) dirname += suffix return dirname def __init__(self, mcmc_config_filename, output=True, suffix='', outputDirName=None): if not os.path.isfile(mcmc_config_filename): print('Error: file not found "{}"'.format(mcmc_config_filename)) print('Please copy the template file "MCMC_config.sample.yml" and adjust to your settings and run again this program') sys.exit(1) self.mcmc_config = yaml.load(open(mcmc_config_filename,'r')) self.patches_md5sum = md5sum(self.mcmc_config['patches']) rngSeed = self.mcmc_config.get('rng_seed', 0) self.rng = np.random.RandomState(rngSeed) if output: if outputDirName: self.outputdir = outputDirName else: self.outputdir = MCMC.buildOutputDirName(self.mcmc_config, suffix) if not os.path.exists(self.outputdir): os.makedirs(self.outputdir) print('All data will be written in {}'.format(self.outputdir)) else: print('Output directory already exists! ({})'.format(self.outputdir)) sys.exit(1) # Copying some input files and scripts shutil.copy(mcmc_config_filename, self.outputdir) for f in [ self.mcmc_config['target'], self.mcmc_config['indicators_config']['resilience'], self.mcmc_config['indicators_config']['social']['cost_matrix_filename'] ]: shutil.copy(f, self.outputdir) os.makedirs(self.outputdir+'/py') for f in glob.glob(os.path.dirname(os.path.abspath(__file__))+'/*.py'): shutil.copy(f, self.outputdir+'/py') # Storing configuration data config_data = { 'rng_seed':rngSeed, 'patches_md5sum':self.patches_md5sum } for key,filename in [ ['biodiversity_matrix_md5sum', self.mcmc_config['indicators_config']['biodiversity']['matrixfilename']], ['social_patches_costs_md5sum', self.mcmc_config['indicators_config']['social']['patches_costs_filename']] ]: if os.path.isfile(filename): config_data[key] = md5sum(filename) with open(self.outputdir+'/config.yml', 'w') as outfile: yaml.dump(config_data, outfile, default_flow_style=False) # finishing init self.patches = load_pat_patches(self.mcmc_config['patches']) self.surfaces = self.patches['SURF_PARC'] self.target = pd.read_csv(self.mcmc_config['target'], sep=';',index_col=0).rename(index=patutils.code_cultgeopat) targetRatio = (self.target['2050']-self.target['2016'])/self.target['2016'] self.targetPAT = self.patches.groupby('cultgeopat')['SURF_PARC'].sum()*(1+targetRatio) self.indicators = Indicators(self.mcmc_config['indicators_config'], self.patches, self.patches_md5sum, self.targetPAT) self.reallocator = Reallocator(self.patches, self.targetPAT, self.mcmc_config['ratio_patches_to_modify']) def is_pareto_efficient(costs, return_mask = True): """ Find the pareto-efficient points :param costs: An (n_points, n_costs) array :param return_mask: True to return a mask :return: An array of indices of pareto-efficient points. If return_mask is True, this will be an (n_points, ) boolean array Otherwise it will be a (n_efficient_points, ) integer array of indices. Author: Source: https://stackoverflow.com/questions/32791911/fast-calculation-of-pareto-front-in-python """ is_efficient = np.arange(costs.shape[0]) n_points = costs.shape[0] next_point_index = 0 # Next index in the is_efficient array to search for while next_point_index1: # multiprocessing loop_data = [[self.getRNGState(iter_nb, loop_index), scen_id] for loop_index,scen_id in enumerate(scenarios.sample(nb_particles, self.rng).index)] results = list(tqdm(self.mp_pool.imap(partial(MCMC.reallocate_mp), loop_data, chunksize=10), total=len(loop_data))) # Consolidate data scenarios.reconstitute_mp(iter_nb, results, disable_progress=disable_progress) else: # Sequential data={'scenarios_patches':[], 'scenarios_cult':[], 'previous_indexes':[], 'fullreallocated':[]} for loop_index, [index, scenario] in tqdm(enumerate(scenarios.sample(nb_particles, self.rng).iterrows()), total=nb_particles, disable=disable_progress): rng = np.random.RandomState(self.getRNGState(iter_nb, loop_index)) [fullreallocated,scenario] = scenarios.reallocate(index, rng) data['fullreallocated'].append(fullreallocated) data['scenarios_patches'].append(scenario) data['scenarios_cult'].append(scenario['cultgeopat']) data['previous_indexes'].append(index) scenarios.reconstitute(iter_nb, **data, disable_progress=disable_progress) elapsed_time = time.time() - start_time print('Iteration duration: {} - {:.2f} scen/s '.format(time.strftime("%M:%S", time.gmtime(elapsed_time)), nb_particles/elapsed_time), end="", flush=True) # Computing pareto front start_time = time.time() pareto_mask = MCMC.is_pareto_efficient(scenarios.scores[['Resilience','Proximity','Productivity','Biodiversity','Social']].values) scenarios.scores['pareto'] = pareto_mask elapsed_time = time.time() - start_time print(' Fully reallocated scenarios {}/{} - '.format(scenarios.nbFullReallocated(), nb_particles), end="", flush=True) if (len(scenarios.scores)>0): print('{:.2f}% on pareto front (computed in {}) '.format(100*np.sum(pareto_mask)/len(pareto_mask), time.strftime("%M:%S", time.gmtime(elapsed_time))), end="", flush=True) # Writing output data if hasattr(self,'outputdir') and write_data: write_times=[] start_time = time.time() scenarios.scores.to_csv(self.outputdir+'/scores_iter_{0:03d}.csv'.format(iter_nb), index=True) write_times.append(time.time() - start_time) start_time = time.time() scenarios.cultgeopat.to_hdf(self.outputdir+'/cult_iter_{0:03d}.h5'.format(iter_nb), 'cultgeopat', index=True, complevel=1) write_times.append(time.time() - start_time) start_time = time.time() try: pairplot = sns.pairplot(scenarios.scores, vars=['Resilience','Proximity','Productivity','Biodiversity','Social','TargetDelta'], diag_kind="kde",hue="pareto" ) pairplot.savefig(self.outputdir+'/mcmc_iter_{0:03d}.png'.format(iter_nb)) plt.close(pairplot.fig) except: # when data doesn't vary enough on a dimension, it is impossible to generate the pareto density # and an error is triggered. Here, we ignore this error. pass write_times.append(time.time() - start_time) print(' - Scores written in {}/{}/{} '.format(*map(lambda x: time.strftime("%M:%S", time.gmtime(x)), write_times)), end="", flush=True) print() # Retaining only optimal particules scenarios.retain(pareto_mask) return scenarios def run(self): # Start with initial scenario global scenarios scenarios = ScenariosStack(self.indicators, self.reallocator, self.patches) num_cpus = self.mcmc_config.get('num_cpus', 1) if num_cpus > 1: import signal mp.set_start_method('fork', force=True) self.mp_pool = mp.Pool(self.mcmc_config['num_cpus'], # for catching correctly Ctrl-C keyboard interruption initializer=lambda:signal.signal(signal.SIGINT, signal.SIG_IGN)) # Iteration nb_iteration=0 for i in range(self.mcmc_config['max_iterations']): nb_iteration=i+1 print('Iteration #{}'.format(nb_iteration)) if nb_iteration==1: nb_particles = self.mcmc_config['initial_nb_particles'] else: nb_particles = self.mcmc_config['step_nb_particles'] self.step(nb_iteration, nb_particles, scenarios, write_data=self.mcmc_config.get('write_data', False), num_cpus=num_cpus) if scenarios.allFullReallocated(): print('All scenarios are fully reallocated. Simulation finished.') break return scenarios if __name__ == '__main__': suffix='' mcmc=None if len(sys.argv) > 1: suffix = '_' + sys.argv[1] try: config = yaml.load(open('MCMC_config.yml','r')) outputdir = MCMC.buildOutputDirName(config, suffix) mcmc = MCMC('MCMC_config.yml', suffix=suffix, outputDirName=outputdir) scenarios = mcmc.run() except KeyboardInterrupt: if mcmc and hasattr(mcmc, 'mp_pool'): mcmc.mp_pool.terminate() mcmc.mp_pool.join() print('Process interrupted.') answer = input('Would you like delete the output data generated? [y/N] ') if answer=='y': print('Deleting {}'.format(outputdir)) shutil.rmtree(outputdir) sys.exit(0)