Commit 848a529d authored by Dumoulin Nicolas's avatar Dumoulin Nicolas
Browse files

cleanup

parent 2b716426
#!/usr/bin/python
# -*- coding: utf-8 -*-
import numpy as np
from scipy.sparse import csr_matrix, lil_matrix
from math import exp,log
from tqdm import tqdm
class Biodiversity:
'''
This class is used to compute the biodiversity indicator for a set of cultural
patches.
This indicator is computed with the equivalent connected area as described
in (Saura et al., 2011):
Santiago Saura, Christine Estreguil, Coralie Mouton, Mónica Rodríguez-Freire,
Network analysis to assess landscape connectivity trends: Application to European
forests (1990–2000), Ecological Indicators, Volume 11, Issue 2, 2011, Pages 407-416
For each pair of patches a probability is computed with an decreasing exponential
function P(i,j) = exp(-l*d). The coefficient l (lambda) is determined for a given
minimal probability at the maximal distance d.
'''
def __init__(self, dmax, epsilon=0.001):
'''
Creates an indicator with initial parameters
:param dmax: the maximum distance threshold for considering a pair of patches
:param epsilon: the minimum probability threshold that should resulted of
the exponential function at distance dmax
'''
self.dmax = dmax
self.epsilon = epsilon
def compute_dispersal_probabilities(self, patches, progress=False):
'''
Computes the matrix of probabilities with given data.
The resulting matrix is stored with a sparse matrix format in the probabilities attribute
:param patches: a GeoDataFrame object containing the cultural patches
'''
# get the lambda value for given thresholds
l = -log(self.epsilon, 2)/self.dmax
# Creating a buffer for these patches, for finding patches under the distance threshold by intersection
patches = patches
buffer = patches.buffer(self.dmax)
index = buffer.sindex
# sparse matrix initialisation
self.probabilities = lil_matrix((len(patches),len(patches)), dtype='d')
for i in tqdm(range(len(patches))):
#print("{} {}".format(i,patches.iloc[i].get('num')))
geom_i = patches.iloc[i].geometry
for fid in list(index.intersection(geom_i.bounds)):
# proba zero on the diagonal
if i == fid:
self.probabilities[fid,i] = 0
if i >= fid:
# the matrix is symetric, so we process only one half and
# affect the proba on two sides
d = patches.iloc[fid].geometry.boundary.distance(geom_i.boundary)
if d <= self.dmax:
# patches i and fid have a distance less than the max distance
p = patches.iloc[fid].geometry.area*geom_i.area*l*exp(-l*d)
self.probabilities[i,fid] = p
self.probabilities[fid,i] = p
else:
self.probabilities[i,fid] = 0
self.probabilities[fid,i] = 0
self.probabilities = self.probabilities.tocsr()
def biodiversity(self, patches):
meadowmask = patches.rpg_lfgc_s=='Prairies'
biodiversity = ((self.probabilities*meadowmask).T*meadowmask).sum()
return biodiversity
def save_probabilities(self, filename):
'''
Save the matrix of probabilities and initial parameters in a file (npz format).
'''
m = self.probabilities
np.savez(filename, dmax=self.dmax, epsilon=self.epsilon, data=m.data,
indices=m.indices, indptr=m.indptr, shape=m.shape)
def load_probabilities(self, filename):
'''
Load the matrix of probabilities and initial parameters from a file (npz format).
'''
loader = np.load(filename)
self.dmax = loader['dmax']
self.epsilon = loader['epsilon']
self.probabilities = csr_matrix(
(loader['data'], loader['indices'], loader['indptr']),
shape=loader['shape']
)
PROJCS["Lambert_Conformal_Conic",GEOGCS["GCS_GRS 1980(IUGG, 1980)",DATUM["D_unknown",SPHEROID["GRS80",6378137,298.257222101]],PRIMEM["Greenwich",0],UNIT["Degree",0.017453292519943295]],PROJECTION["Lambert_Conformal_Conic"],PARAMETER["standard_parallel_1",44],PARAMETER["standard_parallel_2",49],PARAMETER["latitude_of_origin",46.5],PARAMETER["central_meridian",3],PARAMETER["false_easting",700000],PARAMETER["false_northing",6600000],UNIT["Meter",1]]
\ No newline at end of file
PROJCS["unnamed",GEOGCS["GRS 1980(IUGG, 1980)",DATUM["unknown",SPHEROID["GRS80",6378137,298.257222101]],PRIMEM["Greenwich",0],UNIT["degree",0.0174532925199433]],PROJECTION["Lambert_Conformal_Conic_2SP"],PARAMETER["standard_parallel_1",44],PARAMETER["standard_parallel_2",49],PARAMETER["latitude_of_origin",46.5],PARAMETER["central_meridian",3],PARAMETER["false_easting",700000],PARAMETER["false_northing",6600000],UNIT["Meter",1]]
#!/usr/bin/python
# -*- coding: utf-8 -*-
import geopandas as gpd
import numpy as np
from scipy.sparse import csr_matrix, lil_matrix
from math import exp,log
from shapely.geometry import mapping, shape
import json
import fiona
class Biodiversity:
'''
This class is used to compute the biodiversity indicator for a set of cultural
patches and a set of forest patches.
This indicator is computed with the equivalent connected area as described
in (Saura et al., 2011):
Santiago Saura, Christine Estreguil, Coralie Mouton, Monica Rodriguez-Freire,
Network analysis to assess landscape connectivity trends: Application to European
forests (1990-2000), Ecological Indicators, Volume 11, Issue 2, 2011, Pages 407-416
For each pair of patches a probability is computed with an decreasing exponential
function P(i,j) = exp(-l*d). The coefficient l (lambda) is determined for a given
minimal probability at the maximal distance d.
'''
def __init__(self, patches, forest, dmax, epsilon=0.001):
'''
Creates an indicator with initial data.
:param patches: a GeoDataFrame object containing the cultural patches
:param forest: a GeoDataFrame object containing the forest patches
:param dmax: the maximum distance threshold for considering a pair of patches
:param epsilon: the minimum probability threshold that should resulted of
the exponential function at distance dmax
'''
self.patches = patches
self.forest = forest
#self.forest_buffer = forest.buffer(dmax)
self.dmax = dmax
self.epsilon = epsilon
def compute_dispersal_probabilities(self):
'''
Computes effectively the matrix of probabilities.
The matrix is stored with a sparse matrix format in the probabilities attribute
'''
# get the lambda value for given thresholds
l = -log(self.epsilon, 2)/self.dmax
patches = self.patches
forest = self.forest
patches_fusion = fusion_patches_forest(patches,forest) # Fusion of the layer 'patches' and 'forest'
patches = patches_to_list(patches) # Transform the shape of features into a list easily usable in our case
# Creating a buffer for these patches, for finding patches under the distance threshold by intersection
buffer = patches_fusion.buffer(self.dmax)
index = buffer.sindex # Creation of spatial index
# sparse matrix initialisation
self.probabilities = lil_matrix((len(patches),len(patches)+1), dtype='d')
for i in range(500):
geom_i = patches_fusion.iloc[i].geometry
for fid in list(index.intersection(geom_i.bounds)):
# patches i and fid have a distance less than the max distance
d = patches_fusion.iloc[fid].geometry.boundary.distance(geom_i.boundary)
if d<=self.dmax:
if fid >= len(patches) : # if patches[fid] is a forest patch
self.probabilities[i,len(patches)] += patches_fusion.iloc[fid].geometry.area*geom_i.area*l*exp(-l*d)
else :
self.probabilities[i,fid] = patches_fusion.iloc[fid].geometry.area*geom_i.area*l*exp(-l*d)
else:
if fid < len(patches) :
self.probabilities[i,fid] = 0
def save_probabilities(self, filename):
'''
Save the matrix of probabilities in a file (npz format).
'''
m = self.probabilities.tocsr()
np.savez(filename,data=m.data, indices=m.indices,indptr=m.indptr, shape=m.shape)
def load_probabilities(self, filename):
'''
Load the matrix of probabilities from a file (npz format).
'''
loader = np.load(filename)
self.probabilities = csr_matrix(
(loader['data'], loader['indices'], loader['indptr']),
shape=loader['shape']
)
return self.probabilities
def fusion_patches_forest(patches, forest):
list_fusion = []
with fiona.open(forest, 'r') as inputForest:
with fiona.open(patches, 'r') as inputPatches:
#Fusion of the PAT layer and the forest layer
i = 0
for p in inputPatches :
recW = {}
recW["geometry"]=p["geometry"]
recW["properties"] = {}
recW["properties"]["ID"] = p["properties"]["ID_PARCEL"]
recW["properties"]["fid"] = i
list_fusion.append(recW)
i += 1
for f in inputForest :
recW = {}
recW["geometry"]=f["geometry"]
recW["properties"] = {}
recW["properties"]["ID"] = f["properties"]["ID"]
recW["properties"]["fid"] = i
list_fusion.append(recW)
i += 1
return gpd.GeoDataFrame.from_features(list_fusion)
def patches_to_list(patches):
list_fusion = []
with fiona.open(patches, 'r') as inputPatches:
i = 0
for p in inputPatches :
recW = {}
recW["geometry"]=p["geometry"]
recW["properties"] = {}
recW["properties"]["ID"] = p["properties"]["ID_PARCEL"]
recW["properties"]["fid"] = i
list_fusion.append(recW)
i += 1
return gpd.GeoDataFrame.from_features(list_fusion)
patches = './Parcelle_PAT.shp'
forest = './foret_PAT.shp'
s = Biodiversity(patches, forest, 1000, 0.001)
s.compute_dispersal_probabilities()
s.save_probabilities("./compute_dispersal_probabilities_1000.npz")
loader = s.load_probabilities("./compute_dispersal_probabilities_1000.npz")
PROJCS["Lambert_Conformal_Conic",GEOGCS["GCS_GRS 1980(IUGG, 1980)",DATUM["D_unknown",SPHEROID["GRS80",6378137,298.257222101]],PRIMEM["Greenwich",0],UNIT["Degree",0.017453292519943295]],PROJECTION["Lambert_Conformal_Conic"],PARAMETER["standard_parallel_1",44],PARAMETER["standard_parallel_2",49],PARAMETER["latitude_of_origin",46.5],PARAMETER["central_meridian",3],PARAMETER["false_easting",700000],PARAMETER["false_northing",6600000],UNIT["Meter",1]]
\ No newline at end of file
PROJCS["RGF93_Lambert_93",GEOGCS["GCS_RGF93",DATUM["D_RGF_1993",SPHEROID["GRS_1980",6378137,298.257222101]],PRIMEM["Greenwich",0],UNIT["Degree",0.017453292519943295]],PROJECTION["Lambert_Conformal_Conic"],PARAMETER["standard_parallel_1",49],PARAMETER["standard_parallel_2",44],PARAMETER["latitude_of_origin",46.5],PARAMETER["central_meridian",3],PARAMETER["false_easting",700000],PARAMETER["false_northing",6600000],UNIT["Meter",1]]
\ No newline at end of file
PROJCS["RGF93 / Lambert-93",GEOGCS["RGF93",DATUM["Reseau_Geodesique_Francais_1993",SPHEROID["GRS 1980",6378137,298.257222101,AUTHORITY["EPSG","7019"]],TOWGS84[0,0,0,0,0,0,0],AUTHORITY["EPSG","6171"]],PRIMEM["Greenwich",0,AUTHORITY["EPSG","8901"]],UNIT["degree",0.0174532925199433,AUTHORITY["EPSG","9122"]],AUTHORITY["EPSG","4171"]],PROJECTION["Lambert_Conformal_Conic_2SP"],PARAMETER["standard_parallel_1",49],PARAMETER["standard_parallel_2",44],PARAMETER["latitude_of_origin",46.5],PARAMETER["central_meridian",3],PARAMETER["false_easting",700000],PARAMETER["false_northing",6600000],UNIT["metre",1,AUTHORITY["EPSG","9001"]],AXIS["X",EAST],AXIS["Y",NORTH],AUTHORITY["EPSG","2154"]]
#!/usr/bin/python
# -*- coding: utf-8 -*-
import geopandas as gpd
import fiona
from shapely.geometry import mapping, shape, Polygon
import numpy as np
from scipy.sparse import csr_matrix
from scipy.sparse import save_npz
def calcul_indice(matrix_file, scenario_file) :
loader = np.load(matrix_file)
matrix = csr_matrix(
(loader['data'], loader['indices'], loader['indptr']),
shape=loader['shape']
)
buffer = gpd.GeoDataFrame.from_file('./buffer_scenario.shp')
index = buffer.sindex
with fiona.open(scenario_file, 'r') as input_scenario:
sum_indicator = 0.0
list_prairie = [] # List who contains all FID of patches of type 'Prairies'
for i in range(len(input_scenario)) :
if input_scenario[i]["properties"]["rpg_lfgc_s"] == "Prairies" :
list_prairie.append(i)
for i in range(len(list_prairie)):
print i
geom2 = shape(input_scenario[i]["geometry"])
for fid in list(index.intersection(geom2.bounds)):
sum_indicator += matrix[i,fid]
# Add the values of near forests who is stocked in 'matrix[i, len(scenario_size)]'
sum_indicator += matrix[i,len(input_scenario)]
# May be a good idea to modify the result of biodiversity. For now, it's only a big number pretty hard to interpret.
print "L'indice de biodiversité est : " + str(sum_indicator)
def buffer_scenario(filename, size_buffer):
# Create a buffer layer with the size of "size_buffer'
with fiona.open(filename, 'r') as input_scenario:
schema = {'geometry': 'Polygon', 'properties': {'ID' : 'int'}}
crs = input_scenario.crs
driver = input_scenario.driver
with fiona.open('./buffer_scenario.shp','w',driver=driver, crs=crs, schema=schema) as buffer_scenario :
i = 0
for b in input_scenario :
recW = {}
recW["geometry"]=mapping(shape(b["geometry"]).buffer(size_buffer))
recW["properties"] = {}
recW["properties"]["ID"] = i
buffer_scenario.write(recW)
i += 1
matrix_filename = './compute_dispersal_probabilities_1000.npz'
scenario_filename = './Parcelle_PAT.shp'
#buffer_scenario(scenario_filename, 1000) # Start this function if there is no buffer file in the repertory
calcul_indice(matrix_filename, scenario_filename)
import unittest
import geopandas as gpd
from biodiversity import Biodiversity
import os, tempfile
class TestBiodiversity(unittest.TestCase):
patches_geojson='''{
"type": "FeatureCollection",
"name": "zoneTest_rpg22",
"crs": { "type": "name", "properties": { "name": "urn:ogc:def:crs:EPSG::2154" } },
"features": [
{ "type": "Feature", "properties": { "num": "p1", "rpg_lfgc_s": "Prairies" }, "geometry": { "type": "MultiPolygon", "coordinates": [ [ [ [ 716054.303468722035177, 6513904.956056786701083 ], [ 715996.62831969128456, 6512280.745239074341953 ], [ 716997.508792816777714, 6511410.637597753666341 ], [ 717536.329653260996565, 6512483.81216886267066 ], [ 716054.303468722035177, 6513904.956056786701083 ] ] ] ] } },
{ "type": "Feature", "properties": { "num": "p2", "rpg_lfgc_s": "Prairies" }, "geometry": { "type": "MultiPolygon", "coordinates": [ [ [ [ 718590.234241131693125, 6517171.276583690196276 ], [ 717255.066031416645274, 6516173.056995783932507 ], [ 717572.744470257894136, 6515986.14173938613385 ], [ 718584.941813769983128, 6516364.755076318047941 ], [ 718590.234241131693125, 6517171.276583690196276 ] ] ] ] } },
{ "type": "Feature", "properties": { "num": "p3", "rpg_lfgc_s": "Prairies" }, "geometry": { "type": "MultiPolygon", "coordinates": [ [ [ [ 717737.436638569110073, 6512528.581713485531509 ], [ 717608.267519770306535, 6511688.558770489878953 ], [ 719565.843042769003659, 6511959.634334224276245 ], [ 719052.985686160507612, 6512598.809364158660173 ], [ 717737.436638569110073, 6512528.581713485531509 ] ] ] ] } }
]}'''
def test_compute_dispersal_probabilities(self):
patches = gpd.GeoDataFrame.from_features(eval(self.patches_geojson))
biodiversity = Biodiversity(1000)
biodiversity.compute_dispersal_probabilities(patches)
#print(biodiversity.probabilities)
self.assertEqual(len(patches), 3)
self.assertEqual(len(biodiversity.probabilities.nonzero()[0]), 5)
# Removing a meadow, creating a spatial gap
patches2 = patches[patches.num != "p1"]
self.assertEqual(len(patches2), 2)
biodiversity = Biodiversity(1000)
biodiversity.compute_dispersal_probabilities(patches2)
self.assertEqual(len(biodiversity.probabilities.nonzero()[0]), 2)
# Removing a meadow, NOT creating a spatial gap
patches2 = patches[patches.num != "p2"]
self.assertEqual(len(patches2), 2)
biodiversity = Biodiversity(1000)
biodiversity.compute_dispersal_probabilities(patches2)
self.assertEqual(len(biodiversity.probabilities.nonzero()[0]), 4)
def test_biodiversity(self):
patches = gpd.GeoDataFrame.from_features(eval(self.patches_geojson))
biodiversity = Biodiversity(1000)
biodiversity.compute_dispersal_probabilities(patches)
biodiv1 = biodiversity.biodiversity(patches)
patches2 = patches.copy()
patches2.loc[patches.num == 'p2','rpg_lfgc_s'] = 'None'
biodiv2 = biodiversity.biodiversity(patches2)
patches3 = patches.copy()
patches3.loc[patches.num == 'p1','rpg_lfgc_s'] = 'None'
biodiv3 = biodiversity.biodiversity(patches3)
self.assertTrue(biodiv1 > biodiv2)
self.assertTrue(biodiv2 > biodiv3)
def test_save_load_probabilities(self):
patches = gpd.GeoDataFrame.from_features(eval(self.patches_geojson))
biodiversity = Biodiversity(1000, 0.01)
biodiversity.compute_dispersal_probabilities(patches)
f = tempfile.NamedTemporaryFile()
f.close()
filename = f.name+'.npz'
biodiversity.save_probabilities(filename)
biodiversity2 = Biodiversity(2000)
biodiversity2.load_probabilities(filename)
os.unlink(filename)
self.assertEqual((biodiversity.probabilities - biodiversity2.probabilities).sum(),0.0)
self.assertEqual(biodiversity.dmax, biodiversity2.dmax)
self.assertEqual(biodiversity.epsilon, biodiversity2.epsilon)
if __name__ == '__main__':
import xmlrunner
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='test-reports'))
# Plotting the shapes with "num":
'''
patches = gpd.GeoDataFrame.from_features(eval(patches_geojson))
ax = patches.plot(color='#a6ffd7')
patches.buffer(1000).plot(facecolor='none',edgecolor='#a6ffd7', ax=ax)
points = patches.copy()
points.geometry = points.geometry.centroid
for x, y, label in zip(points.geometry.x, points.geometry.y, points.num):
ax.text(x,y,label, fontsize=10)
'''
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment