# -*- coding: utf-8 -*- """ *************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * *************************************************************************** """ import warnings from qgis.PyQt.QtCore import QCoreApplication from qgis.core import (QgsProcessingAlgorithm, QgsProcessingParameterFile, QgsProcessingParameterRasterDestination, QgsProcessingParameterString, QgsProcessingParameterEnum) from qgis import processing import glob import gdal import os import sys import shutil import datetime sys.path.append(os.path.dirname(__file__)) from utils import setNoDataValue, getNoDataValue class TemporalSmoothing(QgsProcessingAlgorithm): INPUT = 'INPUT' OUTPUT = 'OUTPUT' TSINT = 'TSINT' TSPARAMS = 'TSPARAMS' def tr(self, string): return QCoreApplication.translate('Processing', string) def createInstance(self): return TemporalSmoothing() def name(self): return 'TemporalSmoothing' def displayName(self): return self.tr('Temporal Smoothing') def group(self): return self.tr('TSNDVI') def groupId(self): return 'TSNDVI' def shortHelpString(self): return self.tr("Apply Savitzky-Golay filtering to a single channel time series (e.g. NDVI).") def initAlgorithm(self, config=None): self.addParameter( QgsProcessingParameterFile( self.INPUT, self.tr('Input folder') ) ) self.addParameter( QgsProcessingParameterEnum( self.TSINT, self.tr('Temporal smoothing method : sg/irsg'), options=['sg','irsg'], defaultValue='sg' ) ) self.addParameter( QgsProcessingParameterString( self.TSPARAMS, self.tr('Temporal smoothing parameters : sg.deg sg.rad / irsg.ltdeg irsg.ltrad irsg.stdeg irsg.strad irsg.threshval irsg.threshtime'), defaultValue='2 2' )) self.addParameter( QgsProcessingParameterRasterDestination( self.OUTPUT, self.tr('Output smoothed time series') ) ) def processAlgorithm(self, parameters, context, feedback): """ Here is where the processing itself takes place. """ TS_folder = self.parameterAsString( parameters, self.INPUT, context ) output = self.parameterAsOutputLayer( parameters, self.OUTPUT, context ) deg=None rad=None ltdeg = None ltrad = None stdeg = None strad = None threshval = None threshtime = None options=['sg','irsg'] method = options[parameters['TSINT']] nb_param = 2 if method == 'sg' else 6 if len(parameters['TSPARAMS'].split(' '))==nb_param : ts_params = parameters['TSPARAMS'].split(' ') if len(ts_params) == 2: deg = ts_params[0] rad = ts_params[1] elif len(ts_params) == 6: ltdeg = ts_params[0] ltrad = ts_params[1] stdeg = ts_params[2] strad = ts_params[3] threshval = ts_params[4] threshtime = ts_params[5] else : sys.exit('TSPARAMS should have {} parameters with {} smoothing method'.format(nb_param, method)) NDVI_TS_files = sorted(glob.glob(os.path.join(TS_folder, '*NDVI*.tif'))) dates = [x.split('_')[-2][3:] for x in NDVI_TS_files] sdates = [datetime.datetime.strptime(d, '%Y%j').date().strftime('%Y%m%d') for d in dates] datefile = os.path.splitext(output)[0] + '_dates.txt' with open(datefile, 'w') as df: df.write('\n'.join(sdates) + '\n') ndv = getNoDataValue(NDVI_TS_files[0]) nd = len(NDVI_TS_files) tmp_folder = os.path.join(os.path.dirname(output), 'temp') if not os.path.exists(tmp_folder): os.mkdir(tmp_folder) ts_vrt_opt = gdal.BuildVRTOptions(separate=True) ts_vrt = os.path.join(tmp_folder, os.path.splitext(os.path.basename(TS_folder))[0] + '_list.vrt') gdal.BuildVRT(ts_vrt, NDVI_TS_files, options=ts_vrt_opt) Smooth_TS_pre = os.path.join(tmp_folder, os.path.splitext(os.path.basename(output))[0] + '_tmp.tif') Smooth_app_parameters = {"il": [ts_vrt], "dates": datefile, "out": Smooth_TS_pre, "outputpixeltype":2} if deg != None: Smooth_app_parameters['interp.sg.deg'] = deg if rad != None: Smooth_app_parameters['interp.sg.rad'] = rad if ltdeg != None: Smooth_app_parameters['interp.irsg.ltdeg'] = ltdeg if ltrad != None: Smooth_app_parameters['interp.irsg.ltrad'] = ltrad if stdeg != None: Smooth_app_parameters['interp.irsg.stdeg'] = stdeg if strad != None: Smooth_app_parameters['interp.irsg.strad'] = strad if threshval != None: Smooth_app_parameters['interp.irsg.threshval'] = threshval if threshtime != None: Smooth_app_parameters['interp.irsg.threshtime'] = threshtime processing.run("otb:TemporalSmoothing", Smooth_app_parameters, context=context, feedback=feedback) setNoDataValue(Smooth_TS_pre, ndv) msks = os.path.join(tmp_folder, os.path.splitext(os.path.basename(output))[0] + '_msks.tif') msks_exp = '{' + ';'.join(['im1b%d == ' % i + str(ndv) for i in range(1, nd+1)]) + '}' BMX_params = {'il': Smooth_TS_pre, 'exp': msks_exp, 'out': msks, "outputpixeltype": 0} processing.run('otb:BandMathX', BMX_params, context=context, feedback=feedback) vmsk = os.path.join(tmp_folder, os.path.splitext(os.path.basename(output))[0] + '_vmsk.tif') vmsk_exp = '+'.join(['im1b%d' % i for i in range(1,nd+1)]) + '!=' + str(nd) BM_params = {'il': [msks], 'exp': vmsk_exp, 'out': vmsk, "outputpixeltype": 0} processing.run('otb:BandMath', BM_params, context=context, feedback=feedback) cmsk = os.path.join(tmp_folder, os.path.splitext(os.path.basename(output))[0] + '_cmsk.tif') cmsk_exp = '{' + ';'.join(['im1b%d*im2b1' % i for i in range(1,nd+1)]) + '}' BMX_params = {'il': [msks, vmsk], 'exp': cmsk_exp, 'out': cmsk, "outputpixeltype": 0} processing.run('otb:BandMathX', BMX_params, context=context, feedback=feedback) gf = output gf_params = {'in': Smooth_TS_pre, 'out': gf, 'mask': cmsk, 'comp': 1, 'it': 'linear', 'id': datefile, 'od': datefile, "outputpixeltype": 2} out = processing.run('otb:ImageTimeSeriesGapFilling', gf_params, context=context, feedback=feedback) setNoDataValue(gf, ndv) try: shutil.rmtree(tmp_folder) except: warnings.warn('Could not delete temporary folder') return {self.OUTPUT: out}