s2sen2cor.py 7.39 KiB
from TimeSeries.s2theia import *

class S2Sen2CorTilePipeline(S2TheiaTilePipeline):
    # --- BEGIN SENSOR PROTOTYPE ---

    NAME = 'S2-SEN2COR'
    REF_TYPE = otb.ImagePixelType_uint16
    MSK_TYPE = otb.ImagePixelType_uint8
    PTRN_dir = 'S2*_MSIL2A*'
    PTRN_ref = '_B'
    B2_name = 'B02'
    PTRN_10m = ['GRANULE/*/IMG_DATA/R10m/*_B02_10m.jp2',
                'GRANULE/*/IMG_DATA/R10m/*_B03_10m.jp2',
                'GRANULE/*/IMG_DATA/R10m/*_B04_10m.jp2',
                'GRANULE/*/IMG_DATA/R10m/*_B08_10m.jp2']
    PTRN_20m = ['GRANULE/*/IMG_DATA/R20m/*_B05_20m.jp2',
                'GRANULE/*/IMG_DATA/R20m/*_B06_20m.jp2',
                'GRANULE/*/IMG_DATA/R20m/*_B07_20m.jp2',
                'GRANULE/*/IMG_DATA/R20m/*_B8A_20m.jp2',
                'GRANULE/*/IMG_DATA/R20m/*_B11_20m.jp2',
                'GRANULE/*/IMG_DATA/R20m/*_B12_20m.jp2']
    PTRN_msk = ['GRANULE/*/IMG_DATA/R20m/*_SCL_20m.jp2']
    MERG_msk = ['min']
    PTRN_ful = PTRN_10m[0:3] + PTRN_20m[0:3] + [PTRN_10m[3]] + PTRN_20m[3:]
    FEAT_exp = {
        'B2': 'im1b1',
        'B3': 'im1b2',
        'B4': 'im1b3',
        'B5': 'im1b4',
        'B6': 'im1b5',
        'B7': 'im1b6',
        'B8': 'im1b7',
        'B8A': 'im1b8',
        'B11': 'im1b9',
        'B12': 'im1b10',
        'NDVI': '(im1b7-im1b3)/(im1b7+im1b3+1e-6)',
        'NDWI': '(im1b2-im1b7)/(im1b2+im1b7+1e-6)',
        'BRI': 'sqrt(' + '+'.join(['im1b%d*im1b%d' % (i, i) for i in range(1, 11)]) + ')',
        'MNDWI': '(im1b2-im1b9)/(im1b2+im1b9+1e-6)',
        'SWNDVI': '(im1b9-im1b7)/(im1b9+im1b7+1e-6)',
        'NDRE': '(im1b7-im1b4)/(im1b7+im1b4+1e-6)'
    }
    NDT = 0

    @classmethod
    def _check(cls,x):
        return cls.PTRN_dir.split('_')[-1].replace('*', '') in os.path.basename(x)

    @classmethod
    def _img_id(cls,x):
        if cls._check(x):
            if os.path.isdir(x) or os.path.splitext(x)[-1] == '.zip':
                return '_'.join(os.path.splitext(x)[0].split('_')[-5:])
            else:
                return None
        else:
            return None

    @classmethod
    def _img_date(cls,x):
        if cls._check(x):
            if os.path.isdir(x) or os.path.splitext(x)[-1] == '.zip':
                return os.path.splitext(x)[0].split('_')[-5].split('T')[0]
            else:
                return None
        else:
            return None

    @classmethod
    def _tile_id(cls,x):
        if cls._check(x):
            if os.path.isdir(x) or os.path.splitext(x)[-1] == '.zip':
                return os.path.splitext(x)[0].split('_')[-2]
            else:
                return None
        else:
            return None

    @classmethod
    def _tile_cloud_percentage(cls, x):
        if cls._check(x):
            if os.path.isdir(x):
                fid = open(glob.glob(os.path.join(x, '*/*/MTD_TL.xml'))[0], 'r')
                mtd = fid.read()
            elif os.path.splitext(x)[-1] == '.zip':
                arch = zipfile.ZipFile(x)
                fid = [name for name in arch.namelist() if name.endswith('MTD_TL.xml')]
                mtd = arch.read(fid[0])
            root = et.fromstring(mtd)
            f = filter(lambda x: x.tag == "CLOUDY_PIXEL_PERCENTAGE", root.findall('*/*/*'))
            r = list(f)
            return float(r[0].text)

    """
    def _process_mask(self, msks):
        msk_pipe = [otb.Registry.CreateApplication('Superimpose')]
        msk_pipe[-1].SetParameterInputImage('inr', self.pipe[-1].GetParameterOutputImage('out'))
        msk_pipe[-1].SetParameterInputImage('inm', msks[0].GetParameterOutputImage('out'))
        msk_pipe[-1].SetParameterString('interpolator', 'nn')
        msk_pipe[-1].Execute()
        msk_pipe.append(otb.Registry.CreateApplication('BandMath'))
        msk_pipe[-1].AddImageToParameterInputImageList('il', msk_pipe[-2].GetParameterOutputImage('out'))
        msk_pipe[-1].SetParameterString('exp', 'im1b1==0 || im1b1==1 || im1b1==2 || im1b1==7 ||im1b1==3 || im1b1==8 || im1b1==9 || im1b1==10')
        msk_pipe[-1].Execute()
        msk_pipe.append(otb.Registry.CreateApplication('BinaryMorphologicalOperation'))
        msk_pipe[-1].SetParameterInputImage('in', msk_pipe[-2].GetParameterOutputImage('out'))
        msk_pipe[-1].SetParameterString('filter', 'erode')
        msk_pipe[-1].SetParameterString('structype', 'ball')
        msk_pipe[-1].SetParameterString('xradius', '5')
        msk_pipe[-1].SetParameterString('yradius', '5')
        msk_pipe[-1].Execute()
        msk_pipe.append(otb.Registry.CreateApplication('BinaryMorphologicalOperation'))
        msk_pipe[-1].SetParameterInputImage('in', msk_pipe[-2].GetParameterOutputImage('out'))
        msk_pipe[-1].SetParameterString('filter', 'dilate')
        msk_pipe[-1].SetParameterString('structype', 'ball')
        msk_pipe[-1].SetParameterString('xradius', '20')
        msk_pipe[-1].SetParameterString('yradius', '20')
        msk_pipe[-1].Execute()
        return msk_pipe
    """

    def _process_mask(self, msks):
        msk_pipe = [otb.Registry.CreateApplication('BandMath')]
        msk_pipe[-1].AddImageToParameterInputImageList('il', msks[0].GetParameterOutputImage('out'))
        # excluding thin cirrus from masking : no im1b1 == 10 below
        # excluding cloud shadows here (for opening) : no im1b1 == 3 below
        msk_pipe[-1].SetParameterString('exp', 'im1b1==0 || im1b1==1 || im1b1==8 || im1b1==9')
        msk_pipe[-1].Execute()
        msk_pipe.append(otb.Registry.CreateApplication('BandMath'))
        msk_pipe[-1].AddImageToParameterInputImageList('il', msks[0].GetParameterOutputImage('out'))
        msk_pipe[-1].SetParameterString('exp', 'im1b1==2 || im1b1 == 3 || im1b1==7')
        msk_pipe[-1].Execute()
        msk_pipe.append(otb.Registry.CreateApplication('BinaryMorphologicalOperation'))
        msk_pipe[-1].SetParameterInputImage('in', msk_pipe[-2].GetParameterOutputImage('out'))
        msk_pipe[-1].SetParameterString('filter', 'opening')
        msk_pipe[-1].SetParameterString('structype', 'ball')
        msk_pipe[-1].SetParameterString('xradius', '5')
        msk_pipe[-1].SetParameterString('yradius', '5')
        msk_pipe[-1].Execute()
        msk_pipe.append(otb.Registry.CreateApplication('BandMath'))
        msk_pipe[-1].AddImageToParameterInputImageList('il', msk_pipe[-4].GetParameterOutputImage('out'))
        msk_pipe[-1].AddImageToParameterInputImageList('il', msk_pipe[-2].GetParameterOutputImage('out'))
        msk_pipe[-1].SetParameterString('exp', 'im1b1 || im2b1')
        msk_pipe[-1].Execute()
        msk_pipe.append(otb.Registry.CreateApplication('BinaryMorphologicalOperation'))
        msk_pipe[-1].SetParameterInputImage('in', msk_pipe[-2].GetParameterOutputImage('out'))
        msk_pipe[-1].SetParameterString('filter', 'dilate')
        msk_pipe[-1].SetParameterString('structype', 'ball')
        msk_pipe[-1].SetParameterString('xradius', '20')
        msk_pipe[-1].SetParameterString('yradius', '20')
        msk_pipe[-1].Execute()
        msk_pipe.append(otb.Registry.CreateApplication('Superimpose'))
        msk_pipe[-1].SetParameterInputImage('inr', self.pipe[-1].GetParameterOutputImage('out'))
        msk_pipe[-1].SetParameterInputImage('inm', msk_pipe[-2].GetParameterOutputImage('out'))
        msk_pipe[-1].SetParameterString('interpolator', 'nn')
        msk_pipe[-1].Execute()
        return msk_pipe

    # ---- END SENSOR PROTOTYPE ----

class S2Sen2CorPipeline(S2TheiaPipeline):

    S2TilePipeline = S2Sen2CorTilePipeline
    _check = S2TilePipeline._check
    _tile_id = S2TilePipeline._tile_id