Unverified Commit ad441e0a authored by Julien Veyssier's avatar Julien Veyssier
Browse files

bring multiprocess back in step2 for windows


Signed-off-by: default avatarJulien Veyssier <eneiluj@posteo.net>
parent 61094506
...@@ -28,9 +28,8 @@ from osgeo.gdalnumeric import * ...@@ -28,9 +28,8 @@ from osgeo.gdalnumeric import *
from gdalconst import * from gdalconst import *
from osgeo import ogr from osgeo import ogr
if platform.system() != 'Windows': import multiprocessing
import multiprocessing from multiprocessing import Pool, cpu_count
from multiprocessing import Pool, cpu_count
MY_ABS_PATH=os.path.abspath(__file__) MY_ABS_PATH=os.path.abspath(__file__)
MY_DIR=os.path.dirname(MY_ABS_PATH) MY_DIR=os.path.dirname(MY_ABS_PATH)
...@@ -398,15 +397,12 @@ def cut_streams_at_gauges(directory_out, gauges_reloc_name): ...@@ -398,15 +397,12 @@ def cut_streams_at_gauges(directory_out, gauges_reloc_name):
station_lyr = station_shp.CopyLayer(gauges_lyr, gauges_reloc_name) station_lyr = station_shp.CopyLayer(gauges_lyr, gauges_reloc_name)
def processReach(params): def processReach(params):
id, configFileDir, nbProc, useMultiProcess = params id, configFileDir, nbProc = params
# define which grass env we are using # define which grass env we are using
if useMultiProcess: current = multiprocessing.current_process()
current = multiprocessing.current_process() processN = int(current._identity[0])
processN = int(current._identity[0]) # because multiprocess does not reset workers id numbers and we still use [1, N]
# because multiprocess does not reset workers id numbers and we still use [1, N] processN = (processN % nbProc) + 1
processN = (processN % nbProc) + 1
else:
processN = 1
location = 'hru-delin_%s' % (processN) location = 'hru-delin_%s' % (processN)
os.environ['GISRC'] = os.path.join(configFileDir, 'grass_db', 'grassdata', location, '.grassrc') os.environ['GISRC'] = os.path.join(configFileDir, 'grass_db', 'grassdata', location, '.grassrc')
...@@ -423,10 +419,12 @@ def processReach(params): ...@@ -423,10 +419,12 @@ def processReach(params):
''' '''
def main(parms_file, nbProc, generator=False): def main(parms_file, nbProc, generator=False):
# multiprocessing does not work on windows
if platform.system() == 'Windows':
nbProc = 1
print('---------- HRU-delin Step 2 started ---------------------------------------------') print('---------- HRU-delin Step 2 started ---------------------------------------------')
if platform.system() == 'Windows':
path = os.path.abspath(os.path.join(sys.exec_prefix, '../../bin/pythonw3.exe'))
multiprocessing.set_executable(path)
sys.argv = [None]
print('fix in core again %s'%os.path.abspath(os.path.join(sys.exec_prefix, '../../bin/pythonw3.exe')))
# TODO remove dirty global variables # TODO remove dirty global variables
global logf, gauge_code, gauge_area, reloc_lyr, col_name global logf, gauge_code, gauge_area, reloc_lyr, col_name
...@@ -634,16 +632,10 @@ def main(parms_file, nbProc, generator=False): ...@@ -634,16 +632,10 @@ def main(parms_file, nbProc, generator=False):
importRastersInEnv(rastersForWorkers, grassDbPath, location) importRastersInEnv(rastersForWorkers, grassDbPath, location)
nbReachs = len(reach_ids) nbReachs = len(reach_ids)
if platform.system() == 'Windows': if generator:
# on windows (qgis or batch) we don't use multiprocessing print('Starting reach loop with %s process' % nbProc)
params = [(id, configFileDir, nbProc, False) for (i, id) in enumerate(reach_ids)]
results = []
for i, parm in enumerate(params, 1):
results.append(processReach(parm))
yield (i/nbReachs*100)
elif generator:
with Pool(nbProc) as p: with Pool(nbProc) as p:
params = [(id, configFileDir, nbProc, True) for (i, id) in enumerate(reach_ids)] params = [(id, configFileDir, nbProc) for (i, id) in enumerate(reach_ids)]
results = [] results = []
for i, _ in enumerate(p.imap_unordered(processReach, params), 1): for i, _ in enumerate(p.imap_unordered(processReach, params), 1):
results.append(_) results.append(_)
...@@ -652,7 +644,7 @@ def main(parms_file, nbProc, generator=False): ...@@ -652,7 +644,7 @@ def main(parms_file, nbProc, generator=False):
# this is the interesting part, launching N processes in parallel to process basins # this is the interesting part, launching N processes in parallel to process basins
# the locks are here to prevent concurrent terminal tqdm writing # the locks are here to prevent concurrent terminal tqdm writing
with Pool(nbProc, initializer=tqdm.set_lock, initargs=(tqdm.get_lock(),)) as p: with Pool(nbProc, initializer=tqdm.set_lock, initargs=(tqdm.get_lock(),)) as p:
params = [(id, configFileDir, nbProc, True) for (i, id) in enumerate(reach_ids)] params = [(id, configFileDir, nbProc) for (i, id) in enumerate(reach_ids)]
results = list(tqdm(p.imap_unordered(processReach, params), results = list(tqdm(p.imap_unordered(processReach, params),
desc='[main process] get reach id => subbasins id [%s process] ' % nbProc, desc='[main process] get reach id => subbasins id [%s process] ' % nbProc,
total=nbReachs, total=nbReachs,
...@@ -741,14 +733,16 @@ if __name__ == '__main__': ...@@ -741,14 +733,16 @@ if __name__ == '__main__':
if len(sys.argv) > 2: if len(sys.argv) > 2:
nbProcArg = sys.argv[2] nbProcArg = sys.argv[2]
if platform.system() != 'Windows': # determine how many processes we can launch
# determine how many processes we can launch if str(nbProcArg).isnumeric() and int(nbProcArg) > 0:
if str(nbProcArg).isnumeric() and int(nbProcArg) > 0: nbProc = int(nbProcArg)
nbProc = int(nbProcArg)
else:
nbProc = cpu_count()
else: else:
nbProc = 1 nbProc = cpu_count()
if platform.system() == 'Windows':
path = os.path.abspath(os.path.join(sys.exec_prefix, '../../bin/pythonw3.exe'))
multiprocessing.set_executable(path)
sys.argv = [None]
# main is a generator but we don't use it here # main is a generator but we don't use it here
for pc in main(parms_file, nbProc, False): for pc in main(parms_file, nbProc, False):
...@@ -760,4 +754,9 @@ if __name__ == '__main__': ...@@ -760,4 +754,9 @@ if __name__ == '__main__':
pass pass
else: else:
from .parallel import buildGrassEnv, buildGrassLocation, exportRasters, importRastersInEnv from .parallel import buildGrassEnv, buildGrassLocation, exportRasters, importRastersInEnv
from .progressColors import * from .progressColors import *
\ No newline at end of file if platform.system() == 'Windows':
path = os.path.abspath(os.path.join(sys.exec_prefix, '../../bin/pythonw3.exe'))
multiprocessing.set_executable(path)
sys.argv = [None]
print('fix in core %s'%os.path.abspath(os.path.join(sys.exec_prefix, '../../bin/pythonw3.exe')))
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment