Commit d018a8a3 authored by Dumoulin Nicolas's avatar Dumoulin Nicolas
Browse files

Multiprocessing is back with better management of rng seeds

Catching Ctr-C keyboard interruption for clean exit
parent e7879bc5
This diff is collapsed.
...@@ -3,7 +3,12 @@ ...@@ -3,7 +3,12 @@
import pandas as pd import pandas as pd
import geopandas as gpd import geopandas as gpd
import os, sys, time, shutil import os, sys, time, shutil
import glob
import yaml import yaml
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
import numpy as np import numpy as np
from indicators import Indicators from indicators import Indicators
from tqdm import tqdm from tqdm import tqdm
...@@ -13,41 +18,54 @@ import seaborn as sns ...@@ -13,41 +18,54 @@ import seaborn as sns
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from scenarios import ScenariosStack, Reallocator from scenarios import ScenariosStack, Reallocator
import multiprocessing as mp
from functools import partial
class MCMC: class MCMC:
def __init__(self, mcmc_config_filename, output=True): def buildOutputDirName(config, suffix):
dirname = config['output_dir'] + '/' + time.strftime('%Y%m%d-%H%M%S')
dirname += '_{initial_nb_particles}_{step_nb_particles}_{ratio_patches_to_modify}'.format(**config)
dirname += suffix
return dirname
def __init__(self, mcmc_config_filename, output=True, suffix='', outputDirName=None):
if not os.path.isfile(mcmc_config_filename): if not os.path.isfile(mcmc_config_filename):
print('Error: file not found "{}"'.format(mcmc_config_filename)) print('Error: file not found "{}"'.format(mcmc_config_filename))
print('Please copy the template file "MCMC_config.sample.yml" and adjust to your settings and run again this program') print('Please copy the template file "MCMC_config.sample.yml" and adjust to your settings and run again this program')
sys.exit(1) sys.exit(1)
self.mcmc_config = yaml.load(open(mcmc_config_filename,'r')) self.mcmc_config = yaml.load(open(mcmc_config_filename,'r'))
self.patches_md5sum = md5sum(self.mcmc_config['patches']) self.patches_md5sum = md5sum(self.mcmc_config['patches'])
if 'rng_seed' in self.mcmc_config: rngSeed = self.mcmc_config.get('rng_seed', 0)
self.rng = np.random.RandomState(self.mcmc_config['rng_seed']) self.rng = np.random.RandomState(rngSeed)
else:
self.rng = np.random.RandomState(42)
print('MCMC initialized with default seed') # self.rng.get_state()
# Copying input data in output dir
self.outputdir = self.mcmc_config['output_dir'] + '/' + time.strftime('%Y%m%d-%H%M%S') + '_{initial_nb_particles}_{step_nb_particles}_{ratio_patches_to_modify}'.format(**self.mcmc_config)
if output: if output:
if outputDirName:
self.outputdir = outputDirName
else:
self.outputdir = MCMC.buildOutputDirName(self.mcmc_config, suffix)
if not os.path.exists(self.outputdir): if not os.path.exists(self.outputdir):
os.makedirs(self.outputdir) os.makedirs(self.outputdir)
print('All data will be written in {}'.format(self.outputdir)) print('All data will be written in {}'.format(self.outputdir))
else: else:
print('Output directory already exists! ({})'.format(self.outputdir)) print('Output directory already exists! ({})'.format(self.outputdir))
sys.exit(1) sys.exit(1)
# Copying some input files and scripts
shutil.copy(mcmc_config_filename, self.outputdir) shutil.copy(mcmc_config_filename, self.outputdir)
for f in [ self.mcmc_config['target'], for f in [ self.mcmc_config['target'],
self.mcmc_config['indicators_config']['resilience'], self.mcmc_config['indicators_config']['resilience'],
self.mcmc_config['indicators_config']['social']['cost_matrix_filename'] self.mcmc_config['indicators_config']['social']['cost_matrix_filename']
]: ]:
shutil.copy(f, self.outputdir) shutil.copy(f, self.outputdir)
with open(self.outputdir+'/seed.txt', 'w') as outfile: os.makedirs(self.outputdir+'/py')
outfile.write('{}\n'.format(self.rng.get_state())) for f in glob.glob(os.path.dirname(os.path.abspath(__file__))+'/*.py'):
config_data = { shutil.copy(f, self.outputdir+'/py')
'patches_md5sum':self.patches_md5sum, # Storing configuration data
'biodiversity_matrix_md5sum':md5sum(self.mcmc_config['indicators_config']['biodiversity']['matrixfilename']), config_data = { 'rng_seed':rngSeed, 'patches_md5sum':self.patches_md5sum }
'social_patches_costs_md5sum':md5sum(self.mcmc_config['indicators_config']['social']['patches_costs_filename']) for key,filename in [
} ['biodiversity_matrix_md5sum', self.mcmc_config['indicators_config']['biodiversity']['matrixfilename']],
['social_patches_costs_md5sum', self.mcmc_config['indicators_config']['social']['patches_costs_filename']]
]:
if os.path.isfile(filename):
config_data[key] = md5sum(filename)
with open(self.outputdir+'/config.yml', 'w') as outfile: with open(self.outputdir+'/config.yml', 'w') as outfile:
yaml.dump(config_data, outfile, default_flow_style=False) yaml.dump(config_data, outfile, default_flow_style=False)
# finishing init # finishing init
...@@ -57,7 +75,7 @@ class MCMC: ...@@ -57,7 +75,7 @@ class MCMC:
targetRatio = (self.target['2050']-self.target['2016'])/self.target['2016'] targetRatio = (self.target['2050']-self.target['2016'])/self.target['2016']
self.targetPAT = self.patches.groupby('cultgeopat')['SURF_PARC'].sum()*(1+targetRatio) self.targetPAT = self.patches.groupby('cultgeopat')['SURF_PARC'].sum()*(1+targetRatio)
self.indicators = Indicators(self.mcmc_config['indicators_config'], self.patches, self.patches_md5sum, self.targetPAT) self.indicators = Indicators(self.mcmc_config['indicators_config'], self.patches, self.patches_md5sum, self.targetPAT)
self.reallocator = Reallocator(self.rng, self.patches, self.targetPAT, self.mcmc_config['ratio_patches_to_modify']) self.reallocator = Reallocator(self.patches, self.targetPAT, self.mcmc_config['ratio_patches_to_modify'])
def is_pareto_efficient(costs, return_mask = True): def is_pareto_efficient(costs, return_mask = True):
...@@ -87,10 +105,20 @@ class MCMC: ...@@ -87,10 +105,20 @@ class MCMC:
else: else:
return is_efficient return is_efficient
def scenario_scores(patches, indicators): def getRNGState(self, iter_nb, scen_nb):
return indicators.compute_indicators(patches) result = 1 # First RNG is the default one for local sampling
if iter_nb==1:
result += scen_nb
else:
result += self.mcmc_config['initial_nb_particles'] + (iter_nb - 1) * self.mcmc_config['step_nb_particles']
return result
def reallocate_mp(rng_state_and_scen_id):
global scenarios
rng_state, scen_id = rng_state_and_scen_id
return scenarios.reallocate_mp(scen_id, np.random.RandomState(rng_state))
def step(self, iter_nb, nb_particles, scenarios, write_data=True, disable_progress=False): def step(self, iter_nb, nb_particles, scenarios, write_data=True, disable_progress=False, num_cpus=1):
''' '''
Sample new scenarios, evaluate their scores and retain only pareto front. Sample new scenarios, evaluate their scores and retain only pareto front.
...@@ -100,14 +128,23 @@ class MCMC: ...@@ -100,14 +128,23 @@ class MCMC:
''' '''
# Loop of sampling and scoring # Loop of sampling and scoring
start_time = time.time() start_time = time.time()
data={'scenarios_patches':[], 'scenarios_cult':[], 'previous_indexes':[], 'fullreallocated':[]} if num_cpus>1:
for index, scenario in tqdm(scenarios.sample(nb_particles, self.rng).iterrows(), total=nb_particles, disable=disable_progress): # multiprocessing
[fullreallocated,scenario] = scenarios.reallocate(index) loop_data = [[self.getRNGState(iter_nb, loop_index), scen_id] for loop_index,scen_id in enumerate(scenarios.sample(nb_particles, self.rng).index)]
data['fullreallocated'].append(fullreallocated) results = list(tqdm(self.mp_pool.imap(partial(MCMC.reallocate_mp), loop_data, chunksize=10), total=len(loop_data)))
data['scenarios_patches'].append(scenario) # Consolidate data
data['scenarios_cult'].append(scenario['cultgeopat']) scenarios.reconstitute_mp(iter_nb, results, disable_progress=disable_progress)
data['previous_indexes'].append(index) else:
scenarios.reconstitute(iter_nb, **data, disable_progress=disable_progress) # Sequential
data={'scenarios_patches':[], 'scenarios_cult':[], 'previous_indexes':[], 'fullreallocated':[]}
for loop_index, [index, scenario] in tqdm(enumerate(scenarios.sample(nb_particles, self.rng).iterrows()), total=nb_particles, disable=disable_progress):
rng = np.random.RandomState(self.getRNGState(iter_nb, loop_index))
[fullreallocated,scenario] = scenarios.reallocate(index, rng)
data['fullreallocated'].append(fullreallocated)
data['scenarios_patches'].append(scenario)
data['scenarios_cult'].append(scenario['cultgeopat'])
data['previous_indexes'].append(index)
scenarios.reconstitute(iter_nb, **data, disable_progress=disable_progress)
elapsed_time = time.time() - start_time elapsed_time = time.time() - start_time
print('Iteration duration: {} - {:.2f} scen/s '.format(time.strftime("%M:%S", time.gmtime(elapsed_time)), nb_particles/elapsed_time), end="", flush=True) print('Iteration duration: {} - {:.2f} scen/s '.format(time.strftime("%M:%S", time.gmtime(elapsed_time)), nb_particles/elapsed_time), end="", flush=True)
# Computing pareto front # Computing pareto front
...@@ -119,7 +156,7 @@ class MCMC: ...@@ -119,7 +156,7 @@ class MCMC:
if (len(scenarios.scores)>0): if (len(scenarios.scores)>0):
print('{:.2f}% on pareto front (computed in {}) '.format(100*np.sum(pareto_mask)/len(pareto_mask), time.strftime("%M:%S", time.gmtime(elapsed_time))), end="", flush=True) print('{:.2f}% on pareto front (computed in {}) '.format(100*np.sum(pareto_mask)/len(pareto_mask), time.strftime("%M:%S", time.gmtime(elapsed_time))), end="", flush=True)
# Writing output data # Writing output data
if write_data: if hasattr(self,'outputdir') and write_data:
write_times=[] write_times=[]
start_time = time.time() start_time = time.time()
scenarios.scores.to_csv(self.outputdir+'/scores_iter_{0:03d}.csv'.format(iter_nb), index=True) scenarios.scores.to_csv(self.outputdir+'/scores_iter_{0:03d}.csv'.format(iter_nb), index=True)
...@@ -147,7 +184,15 @@ class MCMC: ...@@ -147,7 +184,15 @@ class MCMC:
def run(self): def run(self):
# Start with initial scenario # Start with initial scenario
global scenarios
scenarios = ScenariosStack(self.indicators, self.reallocator, self.patches) scenarios = ScenariosStack(self.indicators, self.reallocator, self.patches)
num_cpus = self.mcmc_config.get('num_cpus', 1)
if num_cpus > 1:
import signal
mp.set_start_method('fork', force=True)
self.mp_pool = mp.Pool(self.mcmc_config['num_cpus'],
# for catching correctly Ctrl-C keyboard interruption
initializer=lambda:signal.signal(signal.SIGINT, signal.SIG_IGN))
# Iteration # Iteration
nb_iteration=0 nb_iteration=0
for i in range(self.mcmc_config['max_iterations']): for i in range(self.mcmc_config['max_iterations']):
...@@ -157,12 +202,29 @@ class MCMC: ...@@ -157,12 +202,29 @@ class MCMC:
nb_particles = self.mcmc_config['initial_nb_particles'] nb_particles = self.mcmc_config['initial_nb_particles']
else: else:
nb_particles = self.mcmc_config['step_nb_particles'] nb_particles = self.mcmc_config['step_nb_particles']
scenarios = self.step(nb_iteration, nb_particles, scenarios, write_data=self.mcmc_config.get('write_data', False)) self.step(nb_iteration, nb_particles, scenarios, write_data=self.mcmc_config.get('write_data', False), num_cpus=num_cpus)
if scenarios.allFullReallocated(): if scenarios.allFullReallocated():
print('All scenarios are fully reallocated. Simulation finished.') print('All scenarios are fully reallocated. Simulation finished.')
break break
return scenarios return scenarios
if __name__ == '__main__': if __name__ == '__main__':
mcmc = MCMC('MCMC_config.yml') suffix=''
scenarios = mcmc.run() mcmc=None
if len(sys.argv) > 1:
suffix = '_' + sys.argv[1]
try:
config = yaml.load(open('MCMC_config.yml','r'))
outputdir = MCMC.buildOutputDirName(config, suffix)
mcmc = MCMC('MCMC_config.yml', suffix=suffix, outputDirName=outputdir)
scenarios = mcmc.run()
except KeyboardInterrupt:
if mcmc and hasattr(mcmc, 'mp_pool'):
mcmc.mp_pool.terminate()
mcmc.mp_pool.join()
print('Process interrupted.')
answer = input('Would you like delete the output data generated? [y/N] ')
if answer=='y':
print('Deleting {}'.format(outputdir))
shutil.rmtree(outputdir)
sys.exit(0)
...@@ -10,7 +10,7 @@ if __name__ == '__main__': ...@@ -10,7 +10,7 @@ if __name__ == '__main__':
timeit.timeit(lambda: eval('mcmc.indicators._indicators[\''+ind+'\'].compute_indicator(mcmc.patches)'), number=10)/10 timeit.timeit(lambda: eval('mcmc.indicators._indicators[\''+ind+'\'].compute_indicator(mcmc.patches)'), number=10)/10
)) ))
print('\nreallocate: {:.4f} s'.format(timeit.timeit(lambda: print('\nreallocate: {:.4f} s'.format(timeit.timeit(lambda:
mcmc.reallocator.reallocate(mcmc.patches) mcmc.reallocator.reallocate(mcmc.patches, mcmc.rng)
, number=10)/10)) , number=10)/10))
for ind in ['Proximity', 'Resilience', 'Productivity', 'Biodiversity', 'Social']: for ind in ['Proximity', 'Resilience', 'Productivity', 'Biodiversity', 'Social']:
print('{}: {:.4f} s'.format(ind, print('{}: {:.4f} s'.format(ind,
......
...@@ -5,12 +5,12 @@ write_data: True ...@@ -5,12 +5,12 @@ write_data: True
initial_nb_particles: 1000 initial_nb_particles: 1000
step_nb_particles: 1000 step_nb_particles: 1000
max_iterations: 20 max_iterations: 20
multiprocessing: False num_cpus: 5
ratio_patches_to_modify: 0.01 ratio_patches_to_modify: 0.01
indicators_config: indicators_config:
proximite: proximite:
surf_totale_cible: 20165939.605135553 surf_totale_cible: 20165939.605135553
resilience: Parcelle_PAT/Grille_resilience.shp resilience: ../resources/grid/resilience_grid.geojson
biodiversity: biodiversity:
dmax: 1000 dmax: 1000
epsilon: 0.001 epsilon: 0.001
......
...@@ -7,15 +7,14 @@ from patutils import load_pat_patches ...@@ -7,15 +7,14 @@ from patutils import load_pat_patches
from tqdm import tqdm from tqdm import tqdm
class Reallocator: class Reallocator:
def __init__(self, rng, patches, targetPAT, ratioNbPatches): def __init__(self, patches, targetPAT, ratioNbPatches):
self.rng = rng
self.ratioNbPatches = ratioNbPatches self.ratioNbPatches = ratioNbPatches
self.targetPAT = targetPAT self.targetPAT = targetPAT
surfDelta = targetPAT - patches.groupby('cultgeopat')['SURF_PARC'].sum() surfDelta = targetPAT - patches.groupby('cultgeopat')['SURF_PARC'].sum()
self.PAT_cult_to_decrease = surfDelta[surfDelta<0].sort_values(ascending=True).keys().tolist() self.PAT_cult_to_decrease = surfDelta[surfDelta<0].sort_values(ascending=True).keys().tolist()
self.PAT_cult_to_increase = surfDelta[surfDelta>0].sort_values(ascending=False).keys().tolist() self.PAT_cult_to_increase = surfDelta[surfDelta>0].sort_values(ascending=False).keys().tolist()
def reallocate(self, patches): def reallocate(self, patches, rng):
nbPatches = int(len(patches)*self.ratioNbPatches) nbPatches = int(len(patches)*self.ratioNbPatches)
surfDelta = self.targetPAT - patches.groupby('cultgeopat')['SURF_PARC'].sum() surfDelta = self.targetPAT - patches.groupby('cultgeopat')['SURF_PARC'].sum()
cult_to_decrease = surfDelta.loc[self.PAT_cult_to_decrease][surfDelta<0].sort_values(ascending=True).keys().tolist() cult_to_decrease = surfDelta.loc[self.PAT_cult_to_decrease][surfDelta<0].sort_values(ascending=True).keys().tolist()
...@@ -25,14 +24,14 @@ class Reallocator: ...@@ -25,14 +24,14 @@ class Reallocator:
return None return None
# Sampling the patches to reallocate # Sampling the patches to reallocate
patches_to_decrease = patches[patches['cultgeopat'].isin(cult_to_decrease)] patches_to_decrease = patches[patches['cultgeopat'].isin(cult_to_decrease)]
samples = patches_to_decrease.sample(n=min(nbPatches,len(patches_to_decrease)), random_state=self.rng)#.reset_index(drop=True) samples = patches_to_decrease.sample(n=min(nbPatches,len(patches_to_decrease)), random_state=rng)#.reset_index(drop=True)
# Building the new culture reallocated # Building the new culture reallocated
factors = surfDelta[cult_to_increase] factors = surfDelta[cult_to_increase]
factors = (factors*len(samples)/factors.sum()).map(round) # normalize on nb samples factors = (factors*len(samples)/factors.sum()).map(round) # normalize on nb samples
newCult = pd.Series(cult_to_increase).repeat(factors) newCult = pd.Series(cult_to_increase).repeat(factors)
if len(newCult) < len(samples): # may be due to factors rounding if len(newCult) < len(samples): # may be due to factors rounding
newCult = newCult.append(newCult.sample(n=len(samples)-len(newCult), random_state=self.rng), ignore_index=True) newCult = newCult.append(newCult.sample(n=len(samples)-len(newCult), random_state=rng), ignore_index=True)
newCult = newCult.sample(frac=1, random_state=self.rng)[:len(samples)].reset_index(drop=True) # shuffle and cut extra elements newCult = newCult.sample(frac=1, random_state=rng)[:len(samples)].reset_index(drop=True) # shuffle and cut extra elements
# Doing the reallocation # Doing the reallocation
patches.loc[samples.index.values,'cultgeopat'] = newCult.values patches.loc[samples.index.values,'cultgeopat'] = newCult.values
surfDelta = self.targetPAT - patches.groupby('cultgeopat')['SURF_PARC'].sum() surfDelta = self.targetPAT - patches.groupby('cultgeopat')['SURF_PARC'].sum()
...@@ -111,9 +110,30 @@ class ScenariosStack: ...@@ -111,9 +110,30 @@ class ScenariosStack:
selected_data = self.initial_patches[columns] selected_data = self.initial_patches[columns]
return pd.concat([self.cultgeopat.loc[id].to_frame("cultgeopat"), selected_data],axis=1) return pd.concat([self.cultgeopat.loc[id].to_frame("cultgeopat"), selected_data],axis=1)
def reallocate(self, scen_id): def reallocate(self, scen_id, rng):
if self.isFullReallocated(scen_id): if self.isFullReallocated(scen_id):
return None return None
else: else:
patches = self.consolidate_scenario(scen_id, ['SURF_PARC', 'Bdv','init_cult', 'VALEUR_CAD']) patches = self.consolidate_scenario(scen_id, ['SURF_PARC', 'Bdv','init_cult', 'VALEUR_CAD', 'elevation'])
return self.reallocator.reallocate(patches) return self.reallocator.reallocate(patches, rng)
def reconstitute_mp(self, iter_nb, data, disable_progress=False):
indexes = [next(self._counter) for id in range(len(data))]
scenarios_cult = [d[2]['cultgeopat'] for d in data]
self.cultgeopat = pd.concat([self.cultgeopat[self.scores['full_reallocated']==True],
pd.DataFrame(scenarios_cult, index=indexes, columns=self.cultgeopat.columns, dtype='int8')
], sort=False)
list_scores = []
for d in tqdm(data, total=len(data), disable=disable_progress):
previous_index=d[0]
scenario_patch = d[2]
list_scores.append(d[3] + [iter_nb, self.scores.loc[previous_index]['cumulated_it']+1, previous_index, False, d[1]])
self.initscores(list_scores, indexes)
def reallocate_mp(self, scen_id, rng):
if self.isFullReallocated(scen_id):
return None
else:
patches = self.consolidate_scenario(scen_id, ['SURF_PARC', 'Bdv','init_cult', 'VALEUR_CAD', 'elevation'])
[fullreallocated,scenario] = self.reallocator.reallocate(patches, rng)
return [scen_id, fullreallocated,scenario, self.indicators.compute_indicators(scenario)]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment