vhrbase.py 7.52 KiB
import otbApplication as otb
from Common.otb_numpy_proc import to_otb_pipeline
import os
import glob
from pathlib import Path
import re
from osgeo import ogr

class SPOT67RasterPipeline:
    # BEGIN SPOT6/7 VHR PROTOTYPE
    MS_FOLDER_PATTERN = 'IMG_SPOT*_MS_*'
    PAN_FOLDER_PATTERN = 'IMG_SPOT*_P_*'
    TILE_PATTERN = '*_R*C*.TIF'
    NDT = 0.0
    REF_TYPE = otb.ImagePixelType_uint16
    TOA_TYPE = otb.ImagePixelType_float
    GRID_SPC = [6, 24]
    RS_LABEL = 'SEN'
    MS_LABEL = 'MS'
    OPT_IN = '?&skipcarto=true'

    @classmethod
    def _find_tiles(cls, x):
        lst = sorted(glob.glob(os.path.join(x, cls.TILE_PATTERN)))
        res = re.search('R[0-9]C[0-9]', lst[-1]).group()
        return lst, int(res[1]), int(res[3]), res

    def __init__(self, pan_root, ms_root=None):
        self.out_idx = []
        self.pipe = []
        self.files = []
        self.types = []
        self.out_p = []

        self.ms_folder = None
        self.pan_folder = None
        self.pipe = []

        if ms_root is None:
            ms_root = pan_root

        for p in Path(pan_root).rglob(self.PAN_FOLDER_PATTERN):
            if os.path.isdir(p):
                self.pan_folder = str(p)
                break

        for p in Path(ms_root).rglob(self.MS_FOLDER_PATTERN):
            if os.path.isdir(p):
                self.ms_folder = str(p)
                break

        if self.ms_folder is None or self.pan_folder is None:
            raise ValueError('No MS and/or PAN folder found')

        self.pan_list, r_pan, c_pan, res = self._find_tiles(self.pan_folder)
        tile_p = otb.Registry.CreateApplication('TileFusion')
        lst = [x + self.OPT_IN for x in self.pan_list]
        tile_p.SetParameterStringList('il', lst)
        tile_p.SetParameterInt('rows', r_pan)
        tile_p.SetParameterInt('cols', c_pan)
        tile_p.Execute()
        fn = os.path.basename(self.pan_list[-1]).replace(res, 'FULL')
        ty = self.REF_TYPE
        self.append(tile_p, fn, ty, 'out', is_output=True)

        self.ms_list, r_ms, c_ms, res = self._find_tiles(self.ms_folder)
        tile_ms = otb.Registry.CreateApplication('TileFusion')
        lst = [x + self.OPT_IN for x in self.ms_list]
        tile_ms.SetParameterStringList('il', lst)
        tile_ms.SetParameterInt('rows', r_ms)
        tile_ms.SetParameterInt('cols', c_ms)
        tile_ms.Execute()
        fn = os.path.basename(self.ms_list[-1]).replace(res, 'FULL')
        ty = self.REF_TYPE
        self.append(tile_ms, fn, ty, 'out', is_output=True)

    def to_toa(self, clamp=True):
        proc_idx = self.out_idx.copy()
        self.out_idx = []

        for p in proc_idx:
            fn = self.files[p].replace('_FULL', '_FULL_TOA')
            toa = otb.Registry.CreateApplication('OpticalCalibration')
            toa.SetParameterInputImage('in', self.pipe[p].GetParameterOutputImage(self.out_p[p]))
            toa.SetParameterInt('clamp', int(clamp))
            toa.Execute()
            self.append(toa)
            bm = otb.Registry.CreateApplication('BandMathX')
            bm.AddImageToParameterInputImageList('il', self.pipe[-1].GetParameterOutputImage('out'))
            bm.SetParameterString('exp', '10000*im1')
            bm.Execute()
            ty = self.REF_TYPE
            self.append(bm, fn, ty, 'out', is_output=True)
        return

    def orthorectify(self, dem_fld, geoid = None):
        assert (os.path.isdir(dem_fld))
        proc_idx = self.out_idx.copy()
        self.out_idx = []
        gspc_idx = 0
        for t in proc_idx:
            ortho = otb.Registry.CreateApplication('OrthoRectification')
            ortho.SetParameterString('elev.dem', dem_fld)
            if geoid is not None:
                assert (os.path.exists(geoid))
                ortho.SetParameterString('elev.geoid', geoid)
            ortho.SetParameterInputImage('io.in', self.pipe[t].GetParameterOutputImage(self.out_p[t]))
            ortho.SetParameterInt('opt.gridspacing', self.GRID_SPC[gspc_idx])
            ortho.Execute()
            fn = self.files[t].replace(self.RS_LABEL, 'ORTHO')
            ty = self.REF_TYPE
            self.append(ortho, fn, ty, 'io.out', is_output=True)
            gspc_idx += 1

    def pansharp(self):
        proc_idx = self.out_idx.copy()
        self.out_idx = []
        assert(len(proc_idx) == 2)
        btps = otb.Registry.CreateApplication("BundleToPerfectSensor")
        btps.SetParameterInputImage('inp', self.pipe[proc_idx[0]].GetParameterOutputImage(self.out_p[proc_idx[0]]))
        btps.SetParameterInputImage('inxs', self.pipe[proc_idx[1]].GetParameterOutputImage(self.out_p[proc_idx[1]]))
        btps.SetParameterString('method', 'bayes')
        btps.Execute()
        fn = self.files[proc_idx[1]].replace(self.MS_LABEL, 'PS')
        ty = self.REF_TYPE
        self.append(btps, fn, ty, 'out', is_output=True)

    def clip(self, roi, buffer=0):
        assert(os.path.exists(roi))
        proc_idx = self.out_idx.copy()
        self.out_idx = []
        for t in proc_idx:
            er = otb.Registry.CreateApplication('ExtractROI')
            er.SetParameterInputImage('in', self.pipe[t].GetParameterOutputImage(self.out_p[t]))
            er.SetParameterString('mode', 'extent')
            ds = ogr.Open(roi)
            ly = ds.GetLayer(0)
            extent = ly.GetExtent()
            extent = (extent[0] - buffer, extent[1] + buffer, extent[2] - buffer, extent[3] + buffer)
            if ly.GetSpatialRef().GetAuthorityCode('PROJCS') == '4326':
                er.SetParameterString('mode.extent.unit', 'lonlat')
            else:
                er.SetParameterString('mode.extent.unit', 'phy')
            er.SetParameterFloat('mode.extent.ulx', extent[0])
            er.SetParameterFloat('mode.extent.uly', extent[2])
            er.SetParameterFloat('mode.extent.lrx', extent[1])
            er.SetParameterFloat('mode.extent.lry', extent[3])
            ly, ds = None, None
            er.Execute()
            fn = self.files[t]
            ty = self.types[t]
            self.append(er, fn, ty, 'out', is_output=True)

    def append(self, app, fname=None, ftype=None, outp=None, is_output=False):
        if is_output:
            self.out_idx.append(len(self.pipe))
        self.pipe.append(app)
        self.files.append(fname)
        self.types.append(ftype)
        self.out_p.append(outp)

    def write_outputs(self, fld, roi=None, update_pipe=False, compress=False):
        out = []
        out_idx_bck = self.out_idx.copy()
        if roi is not None:
            pipe_length = len(self.pipe)
            self.clip(roi)
        if update_pipe:
            assert roi is None, 'Cannot set output files as pipe input over a ROI.'
            self.out_idx = []
        for t in out_idx_bck:
            out_file = os.path.join(fld, self.files[t])
            if compress:
                out_file += '?gdal:co:compress=deflate'
            if not os.path.exists(os.path.dirname(out_file)):
                os.makedirs(os.path.dirname(out_file))
            self.pipe[t].SetParameterString(self.out_p[t], out_file)
            self.pipe[t].SetParameterOutputImagePixelType(self.out_p[t], self.types[t])
            self.pipe[t].ExecuteAndWriteOutput()
            out.append(out_file)
            if update_pipe:
                self.append(to_otb_pipeline(out_file), self.files[t], self.types[t], 'out', is_output=True)
        if roi is not None:
            self.out_idx = out_idx_bck
            self.pipe = self.pipe[:pipe_length]
            self.files = self.files[:pipe_length]
            self.types = self.types[:pipe_length]
            self.out_p = self.out_p[:pipe_length]
        return out