Commit f3dff8e9 authored by Cresson Remi's avatar Cresson Remi
Browse files

ENH: S2A and S3A support

1 merge request!12ENH: S2A and S3A support
Pipeline #33438 passed with stages
in 1 minute and 27 seconds
Showing with 373 additions and 9 deletions
+373 -9
......@@ -57,13 +57,23 @@ class Source(pyotb.Output):
"mode": "binary",
"mode.binary.foreground": 0 if inside else 255,
"background": 255 if inside else 255})
manage_nodata = pyotb.ManageNoData({"in": self,
"mode": "apply",
"mode.apply.mask": rasterization,
"mode.apply.ndval": nodata})
return self.new_source(rasterization, manage_nodata)
return self.masked(binary_mask=rasterization, nodata=nodata)
return self # Nothing but a soft copy of the source
def masked(self, binary_mask, nodata=0):
"""
Return the source masked from an uint8 binary raster (0 or 1..255).
Pixels are set to "nodata" where the mask values are 0.
:param binary_mask: input vector data filename
:param nodata: nodata value for rejected values
:return: masked source
"""
manage_nodata = pyotb.ManageNoData({"in": self,
"mode": "apply",
"mode.apply.mask": binary_mask,
"mode.apply.ndval": nodata})
return self.new_source(binary_mask, manage_nodata)
def resample_over(self, ref_img, interpolator="nn", nodata=0):
"""
Return the source superimposed over the input image
......@@ -112,7 +122,7 @@ class Source(pyotb.Output):
return self # Nothing but a soft copy of the source
class Imagery:
class Imagery(ABC):
"""
Imagery class.
This class carry the base image source, and additional generic stuff common to all sensors imagery.
......
......@@ -12,7 +12,7 @@ def find_all_dimaps(pth):
:param pth: root directory
:return: list of DIMAPS
"""
return utils.find_file_in_all_subdirs(pth=pth, pattern="DIM_*.XML")
return utils.find_files_in_all_subdirs(pth=pth, pattern="DIM_*.XML")
def get_spot67_scenes(root_dir):
......
"""
Sentinel-2 root_scene class
"""
import datetime
from abc import abstractmethod
import pyotb
from scenes import utils
from scenes.core import Source, Imagery, Scene
class Sentinel2SourceBase(Source):
"""
Base class for Sentinel-2 sources
"""
def msk_drilled(self, msk_dict, exp, nodata=0):
"""
:param msk_dict: dict of masks
:param exp: bandmath expression to form the 0-255 binary mask
:param nodata: no-data value in masked output
:return: new masked source
"""
img_size = pyotb.ReadImageInfo(self).GetParameterInt('sizex')
bm = pyotb.BandMath({"il": msk_dict[img_size], "exp": exp})
return self.masked(binary_mask=bm, nodata=nodata)
class Sentinel22ASource(Sentinel2SourceBase):
"""
Sentinel-2 level 2A source class
"""
def cld_msk_drilled(self, nodata=0):
"""
Return the source drilled from the cloud mask
:param nodata: nodata value inside holes
:return: drilled source
"""
return self.msk_drilled(msk_dict={10980: self.root_imagery.root_scene.cld_r1_msk_file,
5490: self.root_imagery.root_scene.cld_r2_msk_file},
exp="im1b1==0?255:0",
nodata=nodata)
class Sentinel23ASource(Sentinel2SourceBase):
"""
Sentinel-2 level 3A source class
"""
def flg_msk_drilled(self, keep_flags_values=(3, 4), nodata=0):
"""
Return the source drilled from the FLG mask
:param keep_flags_values: flags values to keep
:param nodata: nodata value inside holes
:return: drilled source
"""
exp = "||".join(["im1b1=={}".format(val) for val in keep_flags_values]) + "?255:0"
return self.msk_drilled(msk_dict={10980: self.root_imagery.root_scene.cld_r1_msk_file,
5490: self.root_imagery.root_scene.cld_r2_msk_file},
exp=exp,
nodata=nodata)
class Sentinel2ImageryBase(Imagery):
"""
Base class for Sentinel-2 level 2A imagery classes.
"""
def __init__(self, root_scene):
"""
:param root_scene: The Scene of which the Imagery instance is attached
"""
super().__init__(root_scene=root_scene)
def get_10m_bands(self):
"""
:return: 10m spacing bands source
"""
return Source(self, pyotb.ConcatenateImages([self.root_scene.band4_file,
self.root_scene.band3_file,
self.root_scene.band2_file,
self.root_scene.band8_file]))
def get_20m_bands(self):
"""
:return: 20m spacing bands source
"""
return Source(self, pyotb.ConcatenateImages([self.root_scene.band5_file,
self.root_scene.band6_file,
self.root_scene.band7_file,
self.root_scene.band8a_file,
self.root_scene.band11_file,
self.root_scene.band12_file]))
class Sentinel22AImagery(Sentinel2ImageryBase):
"""
Sentinel-2 level 2A class.
"""
def get_clm_10m(self):
"""
:return: the 10m CLM mask
"""
return Source(self, self.root_scene.clm_r1_msk_file)
def get_edg_10m(self):
"""
:return: the 10m EDG mask
"""
return Source(self, self.root_scene.edg_r1_msk_file)
def get_clm_20m(self):
"""
:return: the 20m CLM mask
"""
return Source(self, self.root_scene.clm_r2_msk_file)
def get_edg_20m(self):
"""
:return: the 20m EDG mask
"""
return Source(self, self.root_scene.edg_r2_msk_file)
class Sentinel23AImagery(Sentinel2ImageryBase):
"""
Sentinel-2 level 2A class.
"""
def get_flg_10m(self):
"""
:return: the 10m CLM mask
"""
return Source(self, self.root_scene.flg_r1_msk_file)
def get_flg_20m(self):
"""
:return: the 20m CLM mask
"""
return Source(self, self.root_scene.flg_r2_msk_file)
class Sentinel2SceneBase(Scene):
"""
Base class for Sentinel-2 images
"""
@abstractmethod
def __init__(self, archive, tag):
"""
:param archive: product .zip or directory
"""
self.archive = archive
# Retrieve the list of .tif files
is_zip = self.archive.lower().endswith(".zip")
if is_zip:
print("Input type is a .zip archive")
files = utils.list_files_in_zip(self.archive)
self.files = [utils.to_vsizip(self.archive, f) for f in files]
else:
print("Input type is a directory")
self.files = utils.find_files_in_all_subdirs(self.archive, "*.tif", case_sensitive=False)
# Assign bands files
self.band2_file = self.get_band(tag, "B2")
self.band3_file = self.get_band(tag, "B3")
self.band4_file = self.get_band(tag, "B4")
self.band8_file = self.get_band(tag, "B8")
self.band5_file = self.get_band(tag, "B5")
self.band6_file = self.get_band(tag, "B6")
self.band7_file = self.get_band(tag, "B7")
self.band8a_file = self.get_band(tag, "B8A")
self.band11_file = self.get_band(tag, "B11")
self.band12_file = self.get_band(tag, "B12")
# EPSG, bbox
epsg, _, bbox_wgs84 = utils.get_epsg_extent_bbox(self.band2_file)
# Date
onefile = utils.basename(self.band2_file) # SENTINEL2A_20180630-105440-000_L2A_T31TEJ_D_V1-8
datestr = onefile.split("_")[1] # 20180630-105440
acquisition_date = datetime.datetime.strptime(datestr, '%Y%m%d-%H%M%S-%f')
# Call parent constructor
super().__init__(acquisition_date=acquisition_date, bbox_wgs84=bbox_wgs84, epsg=epsg)
def get_file(self, endswith):
"""
Return the specified file.
Throw an exception if none or multiple candidates are found.
:param endswith: filtered extension
:return: the file
"""
filtered_files_list = [f for f in self.files if f.endswith(endswith)]
nb_matches = len(filtered_files_list)
if nb_matches != 1:
raise ValueError("Cannot instantiate Sentinel-2 Theia product from {}:"
"{} occurrence(s) of file with suffix \"{}\"".format(self.archive, nb_matches, endswith))
return filtered_files_list[0]
def get_band(self, suffix1, suffix2):
"""
Return the file path for the specified band.
Throw an exception if none or multiple candidates are found.
:param suffix1: suffix 1, e.g. "FRE"
:param suffix2: suffix 2, e.g. "B3"
:return: file path for the band
"""
return self.get_file(endswith="_{}_{}.tif".format(suffix1, suffix2))
def get_metadata(self):
"""
Return the metadata
:return: metadata (dict)
"""
metadata = super().get_metadata()
metadata.update({
"Archive": self.archive,
"Band 2": self.band2_file,
"Band 3": self.band3_file,
"Band 4": self.band4_file,
"Band 8": self.band8_file,
"Band 5": self.band5_file,
"Band 6": self.band6_file,
"Band 7": self.band7_file,
"Band 8a": self.band8a_file,
"Band 11": self.band11_file,
"Band 12": self.band12_file,
})
return metadata
class Sentinel22AScene(Sentinel2SceneBase):
"""
Sentinel-2 level 2A scene class.
The class carries all the metadata from the root_scene, and can be used to retrieve its imagery.
"""
def __init__(self, archive):
"""
:param archive: .zip file or folder. Must be a product from MAJA.
"""
# Call parent constructor
super().__init__(archive=archive, tag="FRE")
# Additional rasters
self.clm_r1_msk_file = self.get_file("_CLM_R1.tif")
self.edg_r1_msk_file = self.get_file("_EDG_R1.tif")
self.clm_r2_msk_file = self.get_file("_CLM_R2.tif")
self.edg_r2_msk_file = self.get_file("_EDG_R2.tif")
def get_imagery(self): # pylint: disable=arguments-differ
"""
Return the Sentinel-2 level 2A imagery
:return: Imagery instance
"""
return Sentinel22AImagery(self)
def get_metadata(self):
"""
Return the metadata
:return: metadata (dict)
"""
metadata = super().get_metadata()
metadata.update({
"CLD R1": self.clm_r1_msk_file,
"EDG R1": self.edg_r1_msk_file,
"CLD R2": self.clm_r2_msk_file,
"EDG R2": self.edg_r2_msk_file,
})
return metadata
class Sentinel23AScene(Sentinel2SceneBase):
"""
Sentinel-2 level 3A scene class.
The class carries all the metadata from the root_scene, and can be used to retrieve its imagery.
"""
def __init__(self, archive):
"""
:param archive: .zip file or folder. Must be a product from WASP.
"""
super().__init__(archive=archive, tag="FRC")
# Additional rasters
self.flg_r1_msk_file = self.get_file("_FLG_R1.tif")
self.flg_r2_msk_file = self.get_file("_FLG_R2.tif")
# Call parent constructor
super().__init__()
def get_imagery(self): # pylint: disable=arguments-differ
"""
Return the Sentinel-2 level 3A imagery
:return: Imagery instance
"""
return Sentinel23AImagery(self)
def get_metadata(self):
"""
Return the metadata
:return: metadata (dict)
"""
metadata = super().get_metadata()
metadata.update({
"FLG R1": self.flg_r1_msk_file,
"FLG R2": self.flg_r2_msk_file,
})
return metadata
......@@ -5,6 +5,9 @@ import os
import glob
import pathlib
import math
import zipfile
import re
import fnmatch
from osgeo import osr, ogr, gdal
......@@ -204,14 +207,19 @@ def get_bbox_wgs84_from_vector(vector_file):
return xmin, xmax, ymin, ymax
def find_file_in_all_subdirs(pth, pattern):
def find_files_in_all_subdirs(pth, pattern, case_sensitive=True):
"""
Returns the list of files matching the pattern in all subdirectories of pth
:param pth: path
:param pattern: pattern
:param case_sensitive: case sensitive True or False
:return: list of str
"""
return [y for x in os.walk(pth) for y in glob.glob(os.path.join(x[0], pattern))]
result = []
reg_expr = re.compile(fnmatch.translate(pattern), 0 if case_sensitive else re.IGNORECASE)
for root, _, files in os.walk(pth, topdown=True):
result += [os.path.join(root, j) for j in files if re.match(reg_expr, j)]
return result
def find_file_in_dir(pth, pattern):
......@@ -234,3 +242,37 @@ def get_parent_directory(pth):
if not path:
raise Exception("Cannot define path: {}".format(path))
return str(path.parent)
def list_files_in_zip(filename, endswith=None):
"""
List files in zip archive
:param filename: path of the zip
:param endswith: optional, end of filename to be matched
:return: list of the filepaths
"""
with zipfile.ZipFile(filename) as zip_file:
filelist = zip_file.namelist()
if endswith:
filelist = [f for f in filelist if f.endswith(endswith)]
return filelist
def to_vsizip(zipfn, relpth):
"""
Create path from zip file
:param zipfn: zip archive
:param relpth: relative path (inside archive)
:return: vsizip path
"""
return "/vsizip/{}/{}".format(zipfn, relpth)
def basename(pth):
"""
Returns the basename. Works with files and paths
:param pth: path
:return: basename of the path
"""
return str(pathlib.Path(pth).name)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment