import otbApplication as otb from Common.otb_numpy_proc import to_otb_pipeline import os import glob from pathlib import Path import re from osgeo import ogr, gdal from Common.geometry import compute_displacement from math import ceil, floor, sqrt class SPOT67RasterPipeline: # BEGIN SPOT6/7 VHR PROTOTYPE MS_FOLDER_PATTERN = 'IMG_SPOT*_MS_*' PAN_FOLDER_PATTERN = 'IMG_SPOT*_P_*' TILE_PATTERN = '*_R*C*.TIF' NDT = 0.0 REF_TYPE = otb.ImagePixelType_uint16 TOA_TYPE = otb.ImagePixelType_float GRID_SPC = [6, 24] RS_LABEL = 'SEN' MS_LABEL = 'MS' OPT_IN = '?&skipcarto=true' @classmethod def _find_tiles(cls, x): lst = sorted(glob.glob(os.path.join(x, cls.TILE_PATTERN))) res = re.search('R[0-9]C[0-9]', lst[-1]).group() return lst, int(res[1]), int(res[3]), res def __init__(self, pan_root, ms_root=None): self.out_idx = [] self.pipe = [] self.files = [] self.types = [] self.out_p = [] self.ms_folder = None self.pan_folder = None self.pipe = [] if ms_root is None: ms_root = pan_root for p in Path(pan_root).rglob(self.PAN_FOLDER_PATTERN): if os.path.isdir(p): self.pan_folder = str(p) break for p in Path(ms_root).rglob(self.MS_FOLDER_PATTERN): if os.path.isdir(p): self.ms_folder = str(p) break if self.ms_folder is None or self.pan_folder is None: raise ValueError('No MS and/or PAN folder found') self.pan_list, r_pan, c_pan, res = self._find_tiles(self.pan_folder) tile_p = otb.Registry.CreateApplication('TileFusion') lst = [x + self.OPT_IN for x in self.pan_list] tile_p.SetParameterStringList('il', lst) tile_p.SetParameterInt('rows', r_pan) tile_p.SetParameterInt('cols', c_pan) tile_p.Execute() fn = os.path.basename(self.pan_list[-1]).replace(res, 'FULL') ty = self.REF_TYPE self.append(tile_p, fn, ty, 'out', is_output=True) self.ms_list, r_ms, c_ms, res = self._find_tiles(self.ms_folder) tile_ms = otb.Registry.CreateApplication('TileFusion') lst = [x + self.OPT_IN for x in self.ms_list] tile_ms.SetParameterStringList('il', lst) tile_ms.SetParameterInt('rows', r_ms) tile_ms.SetParameterInt('cols', c_ms) tile_ms.Execute() fn = os.path.basename(self.ms_list[-1]).replace(res, 'FULL') ty = self.REF_TYPE self.append(tile_ms, fn, ty, 'out', is_output=True) self.shift = None def to_toa(self, clamp=True): proc_idx = self.out_idx.copy() self.out_idx = [] for p in proc_idx: fn = self.files[p].replace('_FULL', '_FULL_TOA') toa = otb.Registry.CreateApplication('OpticalCalibration') toa.SetParameterInputImage('in', self.pipe[p].GetParameterOutputImage(self.out_p[p])) toa.SetParameterInt('clamp', int(clamp)) toa.Execute() self.append(toa) bm = otb.Registry.CreateApplication('BandMathX') bm.AddImageToParameterInputImageList('il', self.pipe[-1].GetParameterOutputImage('out')) bm.SetParameterString('exp', '10000*im1') bm.Execute() ty = self.REF_TYPE self.append(bm, fn, ty, 'out', is_output=True) return def orthorectify(self, dem_fld, geoid = None): assert (os.path.isdir(dem_fld)) proc_idx = self.out_idx.copy() self.out_idx = [] gspc_idx = 0 for t in proc_idx: ortho = otb.Registry.CreateApplication('OrthoRectification') ortho.SetParameterString('elev.dem', dem_fld) if geoid is not None: assert (os.path.exists(geoid)) ortho.SetParameterString('elev.geoid', geoid) ortho.SetParameterInputImage('io.in', self.pipe[t].GetParameterOutputImage(self.out_p[t])) ortho.SetParameterInt('opt.gridspacing', self.GRID_SPC[gspc_idx]) ortho.Execute() fn = self.files[t].replace(self.RS_LABEL, 'ORTHO') ty = self.REF_TYPE self.append(ortho, fn, ty, 'io.out', is_output=True) gspc_idx += 1 def pansharp(self): proc_idx = self.out_idx.copy() self.out_idx = [] assert(len(proc_idx) == 2) btps = otb.Registry.CreateApplication("BundleToPerfectSensor") btps.SetParameterInputImage('inp', self.pipe[proc_idx[0]].GetParameterOutputImage(self.out_p[proc_idx[0]])) btps.SetParameterInputImage('inxs', self.pipe[proc_idx[1]].GetParameterOutputImage(self.out_p[proc_idx[1]])) btps.SetParameterString('method', 'bayes') btps.Execute() fn = self.files[proc_idx[1]].replace(self.MS_LABEL, 'PS') ty = self.REF_TYPE self.append(btps, fn, ty, 'out', is_output=True) def rigid_align(self, ref_img, this_band=0, ref_band=2, geobin_radius=32, num_geobins=128, margin=32, filter=5): si = otb.Registry.CreateApplication("Superimpose") si.SetParameterInputImage('inm', self.pipe[-1].GetParameterOutputImage(self.out_p[-1])) si.SetParameterString('inr', ref_img) si.Execute() sz = si.GetImageSize('out') sz = min(sz[0], sz[1]) geobin_size = 2 * geobin_radius + 1 geobin_spacing = floor((sz - geobin_size * ceil(sqrt(num_geobins)) - 2 * margin) / 4) sh = compute_displacement(si, to_otb_pipeline(ref_img), src_band=this_band, tgt_band=ref_band, out_param_src='out', out_param_tgt='out', geobin_size=geobin_size, geobin_spacing=geobin_spacing, margin=margin, filter=filter) if sh is not None: ref_spc = si.GetImageSpacing('out') self.shift = (ref_spc[0]*sh[0][1], ref_spc[1]*sh[0][0]) return def clip(self, roi, buffer=0): assert(os.path.exists(roi)) proc_idx = self.out_idx.copy() self.out_idx = [] for t in proc_idx: spc = self.pipe[t].GetImageSpacing(self.out_p[t]) er = otb.Registry.CreateApplication('ExtractROI') er.SetParameterInputImage('in', self.pipe[t].GetParameterOutputImage(self.out_p[t])) er.SetParameterString('mode', 'extent') ds = ogr.Open(roi) ly = ds.GetLayer(0) extent = list(ly.GetExtent()) extent = [extent[0] - buffer, extent[1] + buffer, extent[2] - buffer, extent[3] + buffer] if spc[0] < 0: tmp = extent[0] extent[0] = extent[1] extent[1] = tmp if spc[1] < 0: tmp = extent[2] extent[2] = extent[3] extent[3] = tmp if ly.GetSpatialRef().GetAuthorityCode('PROJCS') == '4326': er.SetParameterString('mode.extent.unit', 'lonlat') else: er.SetParameterString('mode.extent.unit', 'phy') er.SetParameterFloat('mode.extent.ulx', extent[0]) er.SetParameterFloat('mode.extent.uly', extent[2]) er.SetParameterFloat('mode.extent.lrx', extent[1]) er.SetParameterFloat('mode.extent.lry', extent[3]) ly, ds = None, None er.Execute() fn = self.files[t] ty = self.types[t] self.append(er, fn, ty, 'out', is_output=True) def append(self, app, fname=None, ftype=None, outp=None, is_output=False): if is_output: self.out_idx.append(len(self.pipe)) self.pipe.append(app) self.files.append(fname) self.types.append(ftype) self.out_p.append(outp) def write_outputs(self, fld, roi=None, update_pipe=False, compress=False): out = [] out_idx_bck = proc_idx = self.out_idx.copy() if roi is not None: pipe_length = len(self.pipe) self.clip(roi) proc_idx = self.out_idx.copy() if update_pipe: assert roi is None, 'Cannot set output files as pipe input over a ROI.' self.out_idx = [] for t in proc_idx: out_file = os.path.join(fld, self.files[t]) if compress: out_file += '?gdal:co:compress=deflate&gdal:co:bigtiff=yes' if not os.path.exists(os.path.dirname(out_file)): os.makedirs(os.path.dirname(out_file)) self.pipe[t].SetParameterString(self.out_p[t], out_file) self.pipe[t].SetParameterOutputImagePixelType(self.out_p[t], self.types[t]) self.pipe[t].ExecuteAndWriteOutput() if self.shift is not None: print("INFO: Applying shift {} to current outputs.".format(self.shift)) ds = gdal.Open(out_file, 1) geot = list(ds.GetGeoTransform()) geot[0] += self.shift[0] geot[3] += self.shift[1] ds.SetGeoTransform(tuple(geot)) ds = None out.append(out_file) if update_pipe: self.append(to_otb_pipeline(out_file), self.files[t], self.types[t], 'out', is_output=True) if update_pipe: self.shift = None if roi is not None: self.out_idx = out_idx_bck self.pipe = self.pipe[:pipe_length] self.files = self.files[:pipe_length] self.types = self.types[:pipe_length] self.out_p = self.out_p[:pipe_length] return out class PleiadesOrthoPipeline(SPOT67RasterPipeline): # BEGIN PLEIADES (ORT) VHR PROTOTYPE MS_FOLDER_PATTERN = 'IMG_PHR*_MS_*' PAN_FOLDER_PATTERN = 'IMG_PHR*_P_*' TILE_PATTERN = '*_R*C*.TIF' NDT = 0.0 REF_TYPE = otb.ImagePixelType_uint16 MS_LABEL = 'MS' OPT_IN = '' class PleiadesOrthoAutoMosaicPipeline: def __init__(self, mosaic_root, pattern, out_fld): lst = sorted(glob.glob(os.path.join(mosaic_root, pattern))) lst = [x for x in lst if os.path.isdir(x)] self.scene_list = [PleiadesOrthoPipeline(x) for x in lst] self.processed_scenes = [] self.out_fld = out_fld def process_scenes(self, roi=None, compress=False): for s in self.scene_list: s.to_toa() if roi is not None and os.path.exists(roi): s.clip(roi) s.write_outputs(self.out_fld, update_pipe=True, compress=compress) s.pansharp() self.processed_scenes.extend(s.write_outputs(self.out_fld, compress=compress)) return def generate_mosaic(self, vrt=True): if len(self.processed_scenes) > 0: lst = self.processed_scenes fn = lst[0].split('_') fn[-3] = 'MOSAIC' fn = '_'.join(fn) if vrt is True: vrtopt = gdal.BuildVRTOptions() fn = fn.replace('.TIF', '.vrt') gdal.BuildVRT(fn, lst, options=vrtopt) else: print('Not implemented yet.') return fn else: print('Preprocess scenes first.') return None