Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
Commandre Benjamin
TELIS
Commits
60ed6d35
Commit
60ed6d35
authored
Jan 23, 2019
by
Commandre Benjamin
Browse files
Commit initial
parents
Changes
10
Hide whitespace changes
Inline
Side-by-side
app/Archive.py
0 → 100644
View file @
60ed6d35
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import
os
,
sys
,
glob
,
re
,
io
import
math
,
subprocess
import
urllib.request
import
zipfile
import
requests
import
configparser
from
osgeo
import
ogr
,
gdal
import
numpy
as
np
from
uuid
import
uuid4
from
collections
import
defaultdict
,
UserDict
import
app.Satellites
as
Satellites
import
app.Outils
as
Outils
class
Archive
():
"""
Class to list, download and unpack Theia image archive because of a shapefile (box).
This shapefile get extent of the area.
:param captor: Name of the satellite (ex: Landsat or SpotWorldHeritage ...).
Name used to the url on website Theia Land
:type captor: str
:param list_year: Processing's year (string for one year)
:type list_year: list of str
:param box: Path of the study area
:type box: str
:param folder: Path of the source folder
:type folder: str
:param repertory: Name of the archive's folder
:type repertory: str
"""
def
__init__
(
self
,
captor
,
niveau
,
emprise
,
repertory
,
start_year
,
end_year
):
"""
Create a new 'Archive' instance
"""
self
.
_captor
=
captor
self
.
niveau
=
niveau
self
.
_list_year
=
[]
for
i
in
range
(
int
(
start_year
),
end_year
)
:
self
.
_list_year
.
append
(
i
)
self
.
_list_year
.
append
(
end_year
)
self
.
_box
=
emprise
self
.
_repertory
=
repertory
self
.
emprise
=
emprise
self
.
list_archive
=
[]
self
.
logger
=
Outils
.
Log
(
"log"
,
"Archive"
)
self
.
server
=
Satellites
.
SATELLITE
[
self
.
_captor
][
"server"
]
self
.
resto
=
Satellites
.
SATELLITE
[
self
.
_captor
][
"resto"
]
self
.
token_type
=
Satellites
.
SATELLITE
[
self
.
_captor
][
"token_type"
]
self
.
url
=
''
def
utm_to_latlng
(
self
,
zone
,
easting
,
northing
,
northernHemisphere
=
True
):
"""
Function to convert UTM to geographic coordinates
@param zone: UTM zone
@type zone: int
@param easting: Coordinates UTM in x
@type easting: float
@param northing: Coordinates UTM in y
@type northing: float
@param northernHemisphere: North hemisphere or not
@type northernHemisphere: boolean
:returns: tuple -- integer on the **longitude** and **latitude**
Source : http://www.ibm.com/developerworks/java/library/j-coordconvert/index.html
"""
if
not
northernHemisphere
:
northing
=
10000000
-
northing
a
=
6378137
e
=
0.081819191
e1sq
=
0.006739497
k0
=
0.9996
arc
=
northing
/
k0
mu
=
arc
/
(
a
*
(
1
-
math
.
pow
(
e
,
2
)
/
4.0
-
3
*
math
.
pow
(
e
,
4
)
/
64.0
-
5
*
math
.
pow
(
e
,
6
)
/
256.0
))
ei
=
(
1
-
math
.
pow
((
1
-
e
*
e
),
(
1
/
2.0
)))
/
(
1
+
math
.
pow
((
1
-
e
*
e
),
(
1
/
2.0
)))
ca
=
3
*
ei
/
2
-
27
*
math
.
pow
(
ei
,
3
)
/
32.0
cb
=
21
*
math
.
pow
(
ei
,
2
)
/
16
-
55
*
math
.
pow
(
ei
,
4
)
/
32
cc
=
151
*
math
.
pow
(
ei
,
3
)
/
96
cd
=
1097
*
math
.
pow
(
ei
,
4
)
/
512
phi1
=
mu
+
ca
*
math
.
sin
(
2
*
mu
)
+
cb
*
math
.
sin
(
4
*
mu
)
+
cc
*
math
.
sin
(
6
*
mu
)
+
cd
*
math
.
sin
(
8
*
mu
)
n0
=
a
/
math
.
pow
((
1
-
math
.
pow
((
e
*
math
.
sin
(
phi1
)),
2
)),
(
1
/
2.0
))
r0
=
a
*
(
1
-
e
*
e
)
/
math
.
pow
((
1
-
math
.
pow
((
e
*
math
.
sin
(
phi1
)),
2
)),
(
3
/
2.0
))
fact1
=
n0
*
math
.
tan
(
phi1
)
/
r0
_a1
=
500000
-
easting
dd0
=
_a1
/
(
n0
*
k0
)
fact2
=
dd0
*
dd0
/
2
t0
=
math
.
pow
(
math
.
tan
(
phi1
),
2
)
Q0
=
e1sq
*
math
.
pow
(
math
.
cos
(
phi1
),
2
)
fact3
=
(
5
+
3
*
t0
+
10
*
Q0
-
4
*
Q0
*
Q0
-
9
*
e1sq
)
*
math
.
pow
(
dd0
,
4
)
/
24
fact4
=
(
61
+
90
*
t0
+
298
*
Q0
+
45
*
t0
*
t0
-
252
*
e1sq
-
3
*
Q0
*
Q0
)
*
math
.
pow
(
dd0
,
6
)
/
720
lof1
=
_a1
/
(
n0
*
k0
)
lof2
=
(
1
+
2
*
t0
+
Q0
)
*
math
.
pow
(
dd0
,
3
)
/
6.0
lof3
=
(
5
-
2
*
Q0
+
28
*
t0
-
3
*
math
.
pow
(
Q0
,
2
)
+
8
*
e1sq
+
24
*
math
.
pow
(
t0
,
2
))
*
math
.
pow
(
dd0
,
5
)
/
120
_a2
=
(
lof1
-
lof2
+
lof3
)
/
math
.
cos
(
phi1
)
_a3
=
_a2
*
180
/
math
.
pi
latitude
=
180
*
(
phi1
-
fact1
*
(
fact2
+
fact3
+
fact4
))
/
math
.
pi
if
not
northernHemisphere
:
latitude
=
-
latitude
longitude
=
((
zone
>
0
)
and
(
6
*
zone
-
183.0
)
or
3.0
)
-
_a3
return
(
longitude
,
latitude
)
def
coord_box_dd
(
self
):
"""
Function to get area's coordinates of shapefile
:returns: str -- **area_coord_corner** : Area coordinates corner
--> Left bottom on x, Left bottom on y, Right top on x, Right top on y
"""
# Processus to convert the UTM shapefile
# in decimal degrees shapefile with ogr2ogr in command line
utm_outfile
=
"{0}/UTM_{1}"
.
format
(
os
.
path
.
split
(
self
.
_box
)[
0
],
os
.
path
.
split
(
self
.
_box
)[
1
])
if
os
.
path
.
exists
(
utm_outfile
):
os
.
remove
(
utm_outfile
)
process_tocall
=
[
'ogr2ogr'
,
'-f'
,
'Esri Shapefile'
,
'-t_srs'
,
'EPSG:32631'
,
utm_outfile
,
self
.
_box
]
subprocess
.
call
(
process_tocall
)
# To get shapefile extent
# Avec import ogr
driver
=
ogr
.
GetDriverByName
(
'ESRI Shapefile'
)
# Open shapefile
data_source
=
driver
.
Open
(
utm_outfile
,
0
)
if
data_source
is
None
:
self
.
logger
.
error
(
'Could not open file'
)
sys
.
exit
(
1
)
shp_ogr
=
data_source
.
GetLayer
()
# Extent
extent_
=
shp_ogr
.
GetExtent
()
# (xMin, xMax, yMin, yMax)
## Close source data
data_source
.
Destroy
()
# Coordinates min in decimal degrees
LL_min
=
self
.
utm_to_latlng
(
31
,
extent_
[
0
],
extent_
[
2
],
northernHemisphere
=
True
)
# Coordinates max in decimal degrees
LL_max
=
self
.
utm_to_latlng
(
31
,
extent_
[
1
],
extent_
[
3
],
northernHemisphere
=
True
)
# AdressUrl = 'http://spirit.cnes.fr/resto/Landsat/?format=html&lang=fr&q=2013&box=' + str(LL_Min[0]) + ',' + str(LL_Min[1]) + ',' + str(LL_Max[0]) + ',' + str(LL_Max[1])
area_coord_corner
=
str
(
LL_min
[
0
])
+
','
+
str
(
LL_min
[
1
])
+
','
+
str
(
LL_max
[
0
])
+
','
+
str
(
LL_max
[
1
])
return
area_coord_corner
def
listing
(
self
):
"""
Function to list available archive on plateform Theia Land, and on the area
"""
nbImage
=
0
# Loop on the years
self
.
logger
.
info
(
"Images availables"
)
for
year
in
self
.
_list_year
:
self
.
logger
.
info
(
"=============== {0} ==============="
.
format
(
year
))
self
.
url
=
"{0}/{1}/api/collections/{2}/search.json?lang=fr&processingLevel={3}&_pretty=true&q={4}&box={5}&maxRecord=500"
.
format
(
self
.
server
,
self
.
resto
,
self
.
_captor
,
self
.
niveau
,
year
,
self
.
coord_box_dd
())
# Initialisation variable for a next page
# There is a next page, next = 1
# There isn't next page, next = 0
next_
=
1
# To know path to download images
while
next_
==
1
:
try
:
request_headers
=
{
"User-Agent"
:
"Magic-browser"
}
req
=
urllib
.
request
.
Request
(
self
.
url
,
headers
=
request_headers
)
# Connexion in the database
data
=
urllib
.
request
.
urlopen
(
req
).
read
()
# Read in the database
new_data
=
re
.
sub
(
b
"null"
,
b
"'null'"
,
data
)
# Remove "null" because Python don't like
new_data
=
re
.
sub
(
b
"false"
,
b
"False"
,
new_data
)
# Remove "false" and replace by False (Python know False with a capital letter F)
# Transform the data in dictionary
data_Dict
=
defaultdict
(
list
)
data_Dict
=
UserDict
(
eval
(
new_data
))
# Select archives to download
for
d
in
range
(
len
(
data_Dict
[
'features'
])):
name_archive
=
data_Dict
[
'features'
][
d
][
'properties'
][
'productIdentifier'
]
feature_id
=
data_Dict
[
"features"
][
d
][
'id'
]
link_archive
=
data_Dict
[
'features'
][
d
][
'properties'
][
'services'
][
'download'
][
'url'
].
replace
(
"
\\
"
,
""
)
url_archive
=
link_archive
.
replace
(
self
.
resto
,
"rocket/#"
)
archive_download
=
url_archive
.
replace
(
"/download"
,
""
)
# Path to download
out_archive
=
"{0}/{1}.tgz"
.
format
(
self
.
_repertory
,
name_archive
)
# Name after download
# if len(self.list_archive) == 0 : self.list_archive.append([archive_download, out_archive, feature_id])
if
"SENTINEL2X"
not
in
out_archive
or
self
.
niveau
==
"LEVEL3A"
:
self
.
list_archive
.
append
([
archive_download
,
out_archive
,
feature_id
])
# Verify if there is another page (next)
for
link
in
data_Dict
[
'properties'
][
'links'
]
:
if
link
[
"title"
]
is
'suivant'
:
self
.
url
=
link
[
'href'
].
replace
(
"
\\
"
,
""
)
next_
=
1
break
else
:
next_
=
0
except
Exception
as
e
:
self
.
logger
.
error
(
"Error connexion or error variable : {0}"
.
format
(
e
))
sys
.
exit
(
1
)
if
nbImage
!=
len
(
self
.
list_archive
)
:
if
not
os
.
path
.
exists
(
"{}/{}/Images"
.
format
(
self
.
_repertory
,
year
))
:
os
.
makedirs
(
"{}/{}/Images"
.
format
(
self
.
_repertory
,
year
))
nbImage
=
len
(
self
.
list_archive
)
self
.
logger
.
info
(
"{0} image(s) matched the criteria."
.
format
(
len
(
self
.
list_archive
)))
def
download_auto
(
self
,
user
,
password
,
proxy
=
""
):
"""
Function to download images archive automatically on Theia land data center.
"""
url
=
"{0}/services/authenticate/"
.
format
(
self
.
server
)
proxyDict
=
{
"http"
:
"{0}"
.
format
(
proxy
),
\
"https"
:
"{0}"
.
format
(
proxy
),
\
"ftp"
:
"{0}"
.
format
(
proxy
)
\
}
payload
=
{
'ident'
:
"{0}"
.
format
(
user
),
\
'pass'
:
"{0}"
.
format
(
password
)
}
reponse
=
requests
.
post
(
url
,
data
=
payload
,
proxies
=
proxyDict
)
try
:
if
"text"
in
reponse
.
headers
[
"Content-Type"
]
:
token
=
reponse
.
text
elif
"json"
in
reponse
.
headers
[
"Content-Type"
]
:
token
=
reponse
.
json
()[
"access_token"
]
else
:
raise
(
BaseException
(
'Format non traité.'
))
except
Exception
as
e
:
self
.
logger
.
error
(
"Error during the identification request"
)
sys
.
exit
(
-
1
)
head
=
{
"Authorization"
:
"Bearer {0}"
.
format
(
token
)}
regex
=
'SENTINEL(.+?)_(.+?)-'
dico_date
=
dict
()
sauvegarde
=
configparser
.
ConfigParser
()
for
l
in
self
.
list_archive
:
date
=
re
.
search
(
regex
,
l
[
1
]).
group
(
2
)
nom_image
=
l
[
1
].
split
(
"/"
)[
-
1
][:
-
4
]
sauvegarde
.
read
(
"{}/{}/sauvegarde.ini"
.
format
(
self
.
_repertory
,
date
[:
4
]))
if
str
(
date
)
in
sauvegarde
.
keys
()
:
if
nom_image
not
in
sauvegarde
[
str
(
date
)].
keys
()
:
sauvegarde
[
str
(
date
)][
nom_image
]
=
"False"
sauvegarde
[
str
(
date
)][
"NDVI"
]
=
"False"
if
date
in
dico_date
.
keys
()
:
dico_date
[
date
].
append
(
l
)
else
:
dico_date
[
date
]
=
[
l
]
elif
sauvegarde
[
str
(
date
)][
nom_image
]
==
"False"
:
sauvegarde
[
str
(
date
)][
"NDVI"
]
=
"False"
if
date
in
dico_date
.
keys
()
:
dico_date
[
date
].
append
(
l
)
else
:
dico_date
[
date
]
=
[
l
]
else
:
sauvegarde
[
str
(
date
)]
=
{}
sauvegarde
[
str
(
date
)][
nom_image
]
=
"False"
sauvegarde
[
str
(
date
)][
"NDVI"
]
=
"False"
if
date
in
dico_date
.
keys
()
:
dico_date
[
date
].
append
(
l
)
else
:
dico_date
[
date
]
=
[
l
]
with
open
(
"{}/{}/sauvegarde.ini"
.
format
(
self
.
_repertory
,
date
[:
4
]),
'w'
)
as
configfile
:
sauvegarde
.
write
(
configfile
)
for
cle
in
sorted
(
dico_date
):
liste_content
=
[]
for
idx
,
img
in
enumerate
(
dico_date
[
cle
]):
self
.
logger
.
info
(
"Requête pour l'archive : {0}"
.
format
(
img
[
1
].
split
(
"/"
)[
-
1
]))
url
=
"{0}/{1}/collections/{2}/{3}/download/?issuerId=theia"
.
format
(
self
.
server
,
self
.
resto
,
self
.
_captor
,
img
[
2
])
self
.
logger
.
debug
(
"url : {}"
.
format
(
url
))
reponse
=
requests
.
get
(
url
,
headers
=
head
,
proxies
=
proxyDict
)
liste_content
.
append
(
reponse
.
content
)
del
reponse
self
.
pourc_cloud
(
cle
,
liste_content
)
del
liste_content
sauvegarde
.
read
(
"{}/{}/sauvegarde.ini"
.
format
(
self
.
_repertory
,
cle
[:
4
]))
for
img
in
dico_date
[
cle
]
:
sauvegarde
[
str
(
cle
)][
img
[
1
].
split
(
"/"
)[
-
1
][:
-
4
]]
=
"True"
with
open
(
"{}/{}/sauvegarde.ini"
.
format
(
self
.
_repertory
,
cle
[:
4
]),
'w'
)
as
configfile
:
sauvegarde
.
write
(
configfile
)
self
.
logger
.
info
(
"All images have been downloaded !"
)
def
calcul_aire
(
self
,
tuiles_image
):
option_warp
=
gdal
.
WarpOptions
(
format
=
'Mem'
,
resampleAlg
=
"lanczos"
)
mosaic_image
=
gdal
.
Warp
(
""
,
tuiles_image
,
options
=
option_warp
)
rows
=
mosaic_image
.
RasterYSize
# Rows number
cols
=
mosaic_image
.
RasterXSize
# Columns number
data
=
mosaic_image
.
GetRasterBand
(
1
).
ReadAsArray
(
0
,
0
,
cols
,
rows
).
astype
(
np
.
float16
)
mask_spec
=
np
.
isin
(
data
,
[
-
10000
,
math
.
isnan
],
invert
=
True
)
area
=
float
((
np
.
sum
(
mask_spec
)
*
mosaic_image
.
GetGeoTransform
()[
1
]
*
abs
(
mosaic_image
.
GetGeoTransform
()[
-
1
])
)
/
10000
)
self
.
logger
.
info
(
"Aire = {0} ha"
.
format
(
area
))
return
mosaic_image
,
area
def
ecriture_geotiff
(
self
,
dataset
,
chemin
):
gdal
.
AllRegister
()
driver
=
gdal
.
GetDriverByName
(
'GTiff'
)
outdata
=
driver
.
Create
(
chemin
,
dataset
.
RasterXSize
,
dataset
.
RasterYSize
,
dataset
.
RasterCount
,
gdal
.
GDT_Float32
)
outdata
.
SetGeoTransform
(
dataset
.
GetGeoTransform
())
outdata
.
SetProjection
(
dataset
.
GetProjection
())
for
band
in
range
(
dataset
.
RasterCount
)
:
outdata
.
GetRasterBand
(
band
+
1
).
WriteArray
(
\
dataset
.
GetRasterBand
(
band
+
1
).
ReadAsArray
(
0
,
0
,
dataset
.
RasterXSize
,
dataset
.
RasterYSize
).
astype
(
np
.
float32
)
/
10000.0
,
\
0
,
0
)
outdata
.
GetRasterBand
(
band
+
1
).
FlushCache
()
outdata
=
None
def
pourc_cloud
(
self
,
date
,
liste_content
):
self
.
logger
.
info
(
"Date : {0} -> {1} image(s)"
.
format
(
date
,
len
(
liste_content
)))
extent_img
=
[
'_FRC_B2.tif'
,
'_FRC_B3.tif'
,
'_FRC_B4.tif'
,
'_FRC_B8.tif'
]
tuiles_image
=
[]
options_vrt
=
gdal
.
BuildVRTOptions
(
separate
=
True
)
options_translate
=
gdal
.
TranslateOptions
(
format
=
"Mem"
)
self
.
logger
.
info
(
"Extraction des images"
)
for
idx
,
content
in
enumerate
(
liste_content
)
:
tzip
=
zipfile
.
ZipFile
(
io
.
BytesIO
(
content
))
# Images list in the archive
zip_img
=
tzip
.
namelist
()
liste_bandes
=
[]
liste_mem
=
[]
for
id_ext
,
extension
in
enumerate
(
extent_img
)
:
img
=
[
f
for
f
in
zip_img
if
extension
in
f
][
0
]
mmap_name
=
"/vsimem/"
+
uuid4
().
hex
liste_mem
.
append
(
mmap_name
)
gdal
.
FileFromMemBuffer
(
mmap_name
,
tzip
.
read
(
img
))
liste_bandes
.
append
(
gdal
.
Open
(
mmap_name
))
vrt
=
gdal
.
BuildVRT
(
""
,
liste_bandes
,
options
=
options_vrt
)
tuiles_image
.
append
(
Outils
.
clip
(
gdal
.
Translate
(
""
,
vrt
,
options
=
options_translate
),
self
.
emprise
))
for
mmap_name
in
liste_mem
:
gdal
.
Unlink
(
mmap_name
)
liste_bandes
=
None
del
tzip
liste_content
[
idx
]
=
None
del
liste_content
self
.
logger
.
info
(
"Calcul de l'aire"
)
image
,
aire
=
self
.
calcul_aire
(
tuiles_image
)
del
tuiles_image
if
aire
>
0.0
:
self
.
logger
.
info
(
"Sauvegarde des images"
)
dossier
=
"{0}/{1}/Images"
.
format
(
self
.
_repertory
,
date
[:
4
])
self
.
logger
.
debug
(
"Dossier image : {0}"
.
format
(
dossier
))
self
.
ecriture_geotiff
(
image
,
"{0}/{1}.tif"
.
format
(
dossier
,
date
))
del
image
app/Constantes.py
0 → 100644
View file @
60ed6d35
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import
logging
NIVEAU_DEFAUT
=
logging
.
DEBUG
CLOUD_THRESHOLD
=
0.0
EPSG_PHYMOBAT
=
2154
app/Outils.py
0 → 100644
View file @
60ed6d35
#!/bin/env python3
# -*- coding: utf-8 -*-
import
argparse
import
logging
from
logging.handlers
import
RotatingFileHandler
import
signal
from
contextlib
import
contextmanager
import
inspect
from
osgeo
import
gdal
import
Constantes
class
Fusion
(
argparse
.
Action
):
def
__init__
(
self
,
option_strings
,
dest
,
nargs
=
None
,
**
kwargs
):
super
(
Fusion
,
self
).
__init__
(
option_strings
,
dest
,
nargs
,
**
kwargs
)
def
__call__
(
self
,
parser
,
namespace
,
values
,
option_string
=
None
):
setattr
(
namespace
,
self
.
dest
,
' '
.
join
(
values
).
lower
())
class
Log
(
object
):
def
__init__
(
self
,
dossier
,
nomFichier
,
niveau
=
None
):
super
(
Log
,
self
).
__init__
()
stack
=
inspect
.
stack
()
nom_classe
=
stack
[
1
][
0
].
f_locals
[
"self"
].
__class__
.
__name__
self
.
__logger__
=
logging
.
getLogger
(
nomFichier
)
if
niveau
is
None
:
try
:
niveau
=
Constantes
.
NIVEAU_DEFAUT
except
:
niveau
=
logging
.
DEBUG
self
.
__logger__
.
setLevel
(
niveau
)
format_fichier
=
logging
.
Formatter
(
'%(asctime)s :: %(levelname)s :: %(message)s'
,
datefmt
=
'%d-%m-%Y %H:%M:%S'
)
fichierLog
=
RotatingFileHandler
(
"{0}/{1}.log"
.
format
(
dossier
,
nomFichier
),
'a'
,
1000000
,
1
)
fichierLog
.
setLevel
(
niveau
)
fichierLog
.
setFormatter
(
format_fichier
)
self
.
__logger__
.
addHandler
(
fichierLog
)
format_console
=
logging
.
Formatter
(
'%(asctime)s :: {0} :: %(levelname)s :: %(message)s'
.
format
(
nom_classe
),
datefmt
=
'%d-%m-%Y %H:%M:%S'
)
console
=
logging
.
StreamHandler
()
console
.
setLevel
(
niveau
)
console
.
setFormatter
(
format_console
)
self
.
__logger__
.
addHandler
(
console
)
def
info
(
self
,
message
):
self
.
__logger__
.
info
(
message
)
def
debug
(
self
,
message
):
self
.
__logger__
.
debug
(
message
)
def
warning
(
self
,
message
):
self
.
__logger__
.
warning
(
message
)
def
error
(
self
,
message
):
self
.
__logger__
.
error
(
message
)
def
critical
(
self
,
message
):
self
.
__logger__
.
critical
(
message
)
def
exception
(
self
,
message
):
self
.
__logger__
.
exception
(
message
)
def
close
(
self
):
for
handler
in
self
.
__logger__
.
handlers
[:]
:
handler
.
close
()
self
.
__logger__
.
removeHandler
(
handler
)
class
TimeoutException
(
Exception
):
pass
@
contextmanager
def
limitation_temporelle
(
secondes
):
def
signal_handler
(
signum
,
frame
):
raise
TimeoutException
signal
.
signal
(
signal
.
SIGALRM
,
signal_handler
)
signal
.
alarm
(
secondes
)
try
:
yield
finally
:
signal
.
alarm
(
0
)
# Utilisation :
#
# try:
# with limitation_temporelle(temps_en_seconde):
# appel_fonction()
# except TimeoutException:
# pass
def
clip
(
image
,
cut
,
form
=
"Mem"
,
dst
=
""
):
option_clip
=
gdal
.
WarpOptions
(
cropToCutline
=
True
,
\
cutlineDSName
=
cut
,
outputType
=
gdal
.
GDT_Float32
,
format
=
form
,
dstNodata
=
'NaN'
)
return
gdal
.
Warp
(
dst
,
image
,
options
=
option_clip
)
\ No newline at end of file
app/Processing.py
0 → 100644
View file @
60ed6d35
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import
os
import
configparser
import
datetime
import
glob
from
osgeo
import
gdal
import
otbApplication
as
otb
from
app
import
Archive
def
str2bool
(
v
):
return
v
.
lower
()
in
(
"true"
)
class
Processing
(
object
):
def
__init__
(
self
):
pass
def
i_download
(
self
):
"""
Méthode pour télécharger les images sur le site Theia Land.
"""
if
not
self
.
annee_fin
:
self
.
annee_fin
=
datetime
.
datetime
.
now
().
year
self
.
check_download
=
Archive
.
Archive
(
self
.
capteur
,
self
.
niveau
,
self
.
emprise
,
self
.
resultats
,
self
.
annee_debut
,
self
.
annee_fin
)
self
.
check_download
.
listing
()
self
.
check_download
.
download_auto
(
self
.
id
,
self
.
mdp
,
self
.
proxy
)
self
.
liste_dossier
=
dict
()
for
annee
in
os
.
listdir
(
self
.
resultats
):
with
open
(
"{}/{}/dates.txt"
.
format
(
self
.
resultats
,
annee
),
"w"
)
as
fichier
:
for
image
in
sorted
(
glob
.
glob
(
"{}/{}/Images/*"
.
format
(
self
.
resultats
,
annee
)))
:
fichier
.
write
(
"{}
\n
"
.
format
(
os
.
path
.
basename
(
image
)[:
-
4
]))
self
.
liste_dossier
[
annee
]
=
sorted
([
x
for
x
in
glob
.
glob
(
"{}/{}/Images/*"
.
format
(
self
.
resultats
,
annee
))
if
x
.
endswith
(
".tif"
)])
def
otbPhenologie
(
self
):
otb_Phenologie
=
otb
.
Registry
.
CreateApplication
(
"SigmoFitting"
)
otb_Phenologie
.
SetParameterString
(
"mode"
,
"metrics"
)
for
annee
in
self
.
liste_dossier
:
otb_Phenologie
.
SetParameterString
(
"in"
,
"{}/{}/NDVI/stack_ndvi.tif"
.
format
(
self
.
resultats
,
annee
))
otb_Phenologie
.
SetParameterString
(
"dates"
,
"{}/{}/dates.txt"
.
format
(
self
.
resultats
,
annee
))
otb_Phenologie
.
SetParameterString
(
"out"
,
"{}/{}/metrics.tif"
.
format
(
self
.
resultats
,
annee
))