vhrbase.py 11 KB
Newer Older
import otbApplication as otb
from Common.otb_numpy_proc import to_otb_pipeline
import os
import glob
from pathlib import Path
import re
from osgeo import ogr, gdal
from Common.geometry import compute_displacement
from math import ceil, floor, sqrt

class SPOT67RasterPipeline:
    # BEGIN SPOT6/7 VHR PROTOTYPE
    MS_FOLDER_PATTERN = 'IMG_SPOT*_MS_*'
    PAN_FOLDER_PATTERN = 'IMG_SPOT*_P_*'
    TILE_PATTERN = '*_R*C*.TIF'
    NDT = 0.0
    REF_TYPE = otb.ImagePixelType_uint16
    TOA_TYPE = otb.ImagePixelType_float
    GRID_SPC = [6, 24]
    RS_LABEL = 'SEN'
    MS_LABEL = 'MS'
    OPT_IN = '?&skipcarto=true'

    @classmethod
    def _find_tiles(cls, x):
        lst = sorted(glob.glob(os.path.join(x, cls.TILE_PATTERN)))
        res = re.search('R[0-9]C[0-9]', lst[-1]).group()
        return lst, int(res[1]), int(res[3]), res

    def __init__(self, pan_root, ms_root=None):
        self.out_idx = []
        self.pipe = []
        self.files = []
        self.types = []
        self.out_p = []

        self.ms_folder = None
        self.pan_folder = None
        self.pipe = []

        if ms_root is None:
            ms_root = pan_root

        for p in Path(pan_root).rglob(self.PAN_FOLDER_PATTERN):
            if os.path.isdir(p):
                self.pan_folder = str(p)
                break

        for p in Path(ms_root).rglob(self.MS_FOLDER_PATTERN):
            if os.path.isdir(p):
                self.ms_folder = str(p)
                break

        if self.ms_folder is None or self.pan_folder is None:
            raise ValueError('No MS and/or PAN folder found')

        self.pan_list, r_pan, c_pan, res = self._find_tiles(self.pan_folder)
        tile_p = otb.Registry.CreateApplication('TileFusion')
        lst = [x + self.OPT_IN for x in self.pan_list]
        tile_p.SetParameterStringList('il', lst)
        tile_p.SetParameterInt('rows', r_pan)
        tile_p.SetParameterInt('cols', c_pan)
        tile_p.Execute()
        fn = os.path.basename(self.pan_list[-1]).replace(res, 'FULL')
        ty = self.REF_TYPE
        self.append(tile_p, fn, ty, 'out', is_output=True)

        self.ms_list, r_ms, c_ms, res = self._find_tiles(self.ms_folder)
        tile_ms = otb.Registry.CreateApplication('TileFusion')
        lst = [x + self.OPT_IN for x in self.ms_list]
        tile_ms.SetParameterStringList('il', lst)
        tile_ms.SetParameterInt('rows', r_ms)
        tile_ms.SetParameterInt('cols', c_ms)
        tile_ms.Execute()
        fn = os.path.basename(self.ms_list[-1]).replace(res, 'FULL')
        ty = self.REF_TYPE
        self.append(tile_ms, fn, ty, 'out', is_output=True)

    def to_toa(self, clamp=True):
        proc_idx = self.out_idx.copy()
        self.out_idx = []

        for p in proc_idx:
            fn = self.files[p].replace('_FULL', '_FULL_TOA')
            toa = otb.Registry.CreateApplication('OpticalCalibration')
            toa.SetParameterInputImage('in', self.pipe[p].GetParameterOutputImage(self.out_p[p]))
            toa.SetParameterInt('clamp', int(clamp))
            toa.Execute()
            self.append(toa)
            bm = otb.Registry.CreateApplication('BandMathX')
            bm.AddImageToParameterInputImageList('il', self.pipe[-1].GetParameterOutputImage('out'))
            bm.SetParameterString('exp', '10000*im1')
            bm.Execute()
            ty = self.REF_TYPE
            self.append(bm, fn, ty, 'out', is_output=True)
        return

    def orthorectify(self, dem_fld, geoid = None):
        assert (os.path.isdir(dem_fld))
        proc_idx = self.out_idx.copy()
        self.out_idx = []
        gspc_idx = 0
        for t in proc_idx:
            ortho = otb.Registry.CreateApplication('OrthoRectification')
            ortho.SetParameterString('elev.dem', dem_fld)
            if geoid is not None:
                assert (os.path.exists(geoid))
                ortho.SetParameterString('elev.geoid', geoid)
            ortho.SetParameterInputImage('io.in', self.pipe[t].GetParameterOutputImage(self.out_p[t]))
            ortho.SetParameterInt('opt.gridspacing', self.GRID_SPC[gspc_idx])
            ortho.Execute()
            fn = self.files[t].replace(self.RS_LABEL, 'ORTHO')
            ty = self.REF_TYPE
            self.append(ortho, fn, ty, 'io.out', is_output=True)
            gspc_idx += 1

    def pansharp(self):
        proc_idx = self.out_idx.copy()
        self.out_idx = []
        assert(len(proc_idx) == 2)
        btps = otb.Registry.CreateApplication("BundleToPerfectSensor")
        btps.SetParameterInputImage('inp', self.pipe[proc_idx[0]].GetParameterOutputImage(self.out_p[proc_idx[0]]))
        btps.SetParameterInputImage('inxs', self.pipe[proc_idx[1]].GetParameterOutputImage(self.out_p[proc_idx[1]]))
        btps.SetParameterString('method', 'bayes')
        btps.Execute()
        fn = self.files[proc_idx[1]].replace(self.MS_LABEL, 'PS')
        ty = self.REF_TYPE
        self.append(btps, fn, ty, 'out', is_output=True)

    def rigid_align(self, ref_img, this_band=0, ref_band=2, geobin_radius=32, num_geobins=128, margin=32, filter=5):
        si = otb.Registry.CreateApplication("Superimpose")
        si.SetParameterInputImage('inm', self.pipe[-1].GetParameterOutputImage(self.out_p[-1]))
        si.SetParameterString('inr', ref_img)
        si.Execute()
        sz = si.GetImageSize('out')
        sz = min(sz[0], sz[1])
        geobin_size = 2 * geobin_radius + 1
        geobin_spacing = floor((sz - geobin_size * ceil(sqrt(num_geobins)) - 2 * margin) / 4)
        sh = compute_displacement(si, to_otb_pipeline(ref_img), src_band=this_band, tgt_band=ref_band,
                                  out_param_src='out', out_param_tgt='out',
                                  geobin_size=geobin_size, geobin_spacing=geobin_spacing, margin=margin, filter=filter)
        if sh is not None:
            ref_spc = si.GetImageSpacing('out')
            self.shift = (ref_spc[0]*sh[0][1], ref_spc[1]*sh[0][0])
        return

    def clip(self, roi, buffer=0):
        assert(os.path.exists(roi))
        proc_idx = self.out_idx.copy()
        self.out_idx = []
        for t in proc_idx:
            spc = self.pipe[t].GetImageSpacing(self.out_p[t])
            er = otb.Registry.CreateApplication('ExtractROI')
            er.SetParameterInputImage('in', self.pipe[t].GetParameterOutputImage(self.out_p[t]))
            er.SetParameterString('mode', 'extent')
            ds = ogr.Open(roi)
            ly = ds.GetLayer(0)
            extent = list(ly.GetExtent())
            extent = [extent[0] - buffer, extent[1] + buffer, extent[2] - buffer, extent[3] + buffer]
            if spc[0] < 0:
                tmp = extent[0]
                extent[0] = extent[1]
                extent[1] = tmp
            if spc[1] < 0:
                tmp = extent[2]
                extent[2] = extent[3]
                extent[3] = tmp
            if ly.GetSpatialRef().GetAuthorityCode('PROJCS') == '4326':
                er.SetParameterString('mode.extent.unit', 'lonlat')
            else:
                er.SetParameterString('mode.extent.unit', 'phy')
            er.SetParameterFloat('mode.extent.ulx', extent[0])
            er.SetParameterFloat('mode.extent.uly', extent[2])
            er.SetParameterFloat('mode.extent.lrx', extent[1])
            er.SetParameterFloat('mode.extent.lry', extent[3])
            ly, ds = None, None
            er.Execute()
            fn = self.files[t]
            ty = self.types[t]
            self.append(er, fn, ty, 'out', is_output=True)

    def append(self, app, fname=None, ftype=None, outp=None, is_output=False):
        if is_output:
            self.out_idx.append(len(self.pipe))
        self.pipe.append(app)
        self.files.append(fname)
        self.types.append(ftype)
        self.out_p.append(outp)

    def write_outputs(self, fld, roi=None, update_pipe=False, compress=False):
        out = []
        out_idx_bck = proc_idx = self.out_idx.copy()
        if roi is not None:
            pipe_length = len(self.pipe)
            self.clip(roi)
            proc_idx = self.out_idx.copy()
        if update_pipe:
            assert roi is None, 'Cannot set output files as pipe input over a ROI.'
            self.out_idx = []
            out_file = os.path.join(fld, self.files[t])
            if compress:
                out_file += '?gdal:co:compress=deflate&gdal:co:bigtiff=yes'
            if not os.path.exists(os.path.dirname(out_file)):
                os.makedirs(os.path.dirname(out_file))
            self.pipe[t].SetParameterString(self.out_p[t], out_file)
            self.pipe[t].SetParameterOutputImagePixelType(self.out_p[t], self.types[t])
            self.pipe[t].ExecuteAndWriteOutput()
            if self.shift is not None:
                print("INFO: Applying shift {} to current outputs.".format(self.shift))
                ds = gdal.Open(out_file, 1)
                geot = list(ds.GetGeoTransform())
                geot[0] += self.shift[0]
                geot[3] += self.shift[1]
                ds.SetGeoTransform(tuple(geot))
                ds = None
            out.append(out_file)
            if update_pipe:
                self.append(to_otb_pipeline(out_file), self.files[t], self.types[t], 'out', is_output=True)
        if update_pipe:
            self.shift = None
        if roi is not None:
            self.out_idx = out_idx_bck
            self.pipe = self.pipe[:pipe_length]
            self.files = self.files[:pipe_length]
            self.types = self.types[:pipe_length]
            self.out_p = self.out_p[:pipe_length]

class PleiadesOrthoPipeline(SPOT67RasterPipeline):
    # BEGIN PLEIADES (ORT) VHR PROTOTYPE
    MS_FOLDER_PATTERN = 'IMG_PHR*_MS_*'
    PAN_FOLDER_PATTERN = 'IMG_PHR*_P_*'
    TILE_PATTERN = '*_R*C*.TIF'
    NDT = 0.0
    REF_TYPE = otb.ImagePixelType_uint16
    MS_LABEL = 'MS'
    OPT_IN = ''

class PleiadesOrthoAutoMosaicPipeline:
    def __init__(self, mosaic_root, pattern, out_fld):
        lst = sorted(glob.glob(os.path.join(mosaic_root, pattern)))
        lst = [x for x in lst if os.path.isdir(x)]
        self.scene_list = [PleiadesOrthoPipeline(x) for x in lst]
        self.processed_scenes = []
        self.out_fld = out_fld

    def process_scenes(self, roi=None, compress=False):
        for s in self.scene_list:
            s.to_toa()
            if roi is not None and os.path.exists(roi):
                s.clip(roi)
            s.write_outputs(self.out_fld, update_pipe=True, compress=compress)
            s.pansharp()
            self.processed_scenes.extend(s.write_outputs(self.out_fld, compress=compress))
        return
    
    def generate_mosaic(self, vrt=True):
        if len(self.processed_scenes) > 0:
            lst = self.processed_scenes
            fn = lst[0].split('_')
            fn[-3] = 'MOSAIC'
            fn = '_'.join(fn)
            if vrt is True:
                vrtopt = gdal.BuildVRTOptions()
                fn = fn.replace('.TIF', '.vrt')
                gdal.BuildVRT(fn, lst, options=vrtopt)
            else:
                print('Not implemented yet.')
            return fn
        else:
            print('Preprocess scenes first.')
            return None