# -*- coding: utf-8 -*- """ *************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * *************************************************************************** """ from qgis.PyQt.QtCore import QCoreApplication from qgis.core import (QgsProcessingAlgorithm, QgsProcessingParameterFile, QgsProcessingParameterString, QgsProcessingParameterNumber, QgsProcessingParameterRasterLayer, QgsProcessingParameterRasterDestination) from qgis import processing import os import gdal import datetime def find_next_date(datelist, date): for i,l in enumerate(datelist): if l >= date: return i return None def find_first_month_day(dates, month=1, day=1): start_year = dates[0].year test_date = datetime.datetime(start_year,month,day) if dates[0] > test_date: start_year += 1 test_date = datetime.datetime(start_year, month, day) return find_next_date(dates, test_date) def get_period_intervals(date_file, md=[1, 1], duration=365): dates = [] with open(date_file) as f: for l in f.read().splitlines(): try: dates.append(datetime.datetime.strptime(l, '%Y%m%d')) except: continue delta = dates[-1] - dates[-2] periods = [] s, e = 0, 0 S = 0 while s is not None and e is not None: s = find_first_month_day(dates, md[0], md[1]) if s is not None: e = find_next_date(dates, dates[s] + datetime.timedelta(days=duration) - delta) if e is not None: periods.append((S + s, S + e + 1)) S += s + 1 dates = dates[s + 1:] return periods def get_mean_expression(periods): l = periods[0][1] - periods[0][0] expr = [] for i in range(1, l + 1): s = [p[0] + i for p in periods] expr.append('(' + '+'.join(['im1b%d' % x for x in s]) + ')/' + str(len(s))) return '{' + ';'.join(expr) + '}' def NDVI_subdates_maker(TS_file,datefile,output_folder,begin_date,end_date,context,feedback): raster_band_list = [] dates_list= [] with open(datefile,'r') as f: for i,date in enumerate(f): if int(date)>=begin_date and int(date) <=end_date: raster_band_list.append(i+1) dates_list.append(int(date)) exp='' for i in raster_band_list : exp+='im1b'+str(i)+';' exp=exp[:-1] out = os.path.join(output_folder,"sub_ndvi_ts.tif") BMX_parameters = {'il':[TS_file],'exp':exp, 'out':out, 'outcontext':os.path.join(output_folder,"outcontext.txt")} processing.run('otb:BandMathX',BMX_parameters, context=context, feedback=feedback) os.remove(os.path.join(output_folder,"outcontext.txt")) with open(os.path.join(output_folder,os.path.basename(datefile)),'w') as f: for date in dates_list: f.write(str(date)+'\n') return out, os.path.join(output_folder,os.path.basename(datefile)) def setNoDataValue(fn, val=None): ds = gdal.Open(fn, gdal.GA_Update) if val is None: ds.GetRasterBand(1).DeleteNoDataValue() else: ds.GetRasterBand(1).SetNoDataValue(val) ds = None def getNoDataValue(fn): ds = gdal.Open(fn) ndv = ds.GetRasterBand(1).GetNoDataValue() ds = None return ndv def moy_NDVI_TS(TS,dates, month, day, duration): periods = get_period_intervals(dates, [month,day], duration) return get_mean_expression(periods) class LandscapeStratification(QgsProcessingAlgorithm): INPUT = 'INPUT' INPUTDATE = 'INPUTDATE' BEGDATE = 'BEGDATE' ENDDATE = 'ENDDATE' STARTMONTH = 'STARTMONTH' STARTDAY = 'STARTDAY' DURATION = 'DURATION' OUTPUT = 'OUTPUT' PREFIX = 'PREFIX' CBEGIN = 'CBEGIN' CEND = 'CEND' def tr(self, string): return QCoreApplication.translate('Processing', string) def createInstance(self): return LandscapeStratification() def name(self): return 'LandscapeStratification' def displayName(self): return self.tr('LandscapeStratification') def group(self): return self.tr('TSNDVI') def groupId(self): return 'TSNDVI' def shortHelpString(self): return self.tr("Compute a landscape stratification metric") def initAlgorithm(self, config=None): self.addParameter( QgsProcessingParameterRasterLayer( self.INPUT, self.tr('Input NDVI stack') ) ) self.addParameter( QgsProcessingParameterFile( self.INPUTDATE, self.tr('Input NDVI dates file'), optional=True ) ) self.addParameter( QgsProcessingParameterString( self.BEGDATE, self.tr('Beginning date for selection (YYYYMMDD)'), defaultValue='20000101' ) ) self.addParameter( QgsProcessingParameterString( self.ENDDATE, self.tr('End date for selection (YYYYMMDD)'), defaultValue=datetime.date.today().strftime("%Y%m%d") ) ) self.addParameter( QgsProcessingParameterNumber( self.STARTMONTH, self.tr('Starting month for multi-annual mean'), type=0, defaultValue=1, minValue=1, maxValue=12 ) ) self.addParameter( QgsProcessingParameterNumber( self.STARTDAY, self.tr('Starting day for multi-annual mean'), type=0, defaultValue=1, minValue=1, maxValue=31 ) ) self.addParameter( QgsProcessingParameterNumber( self.DURATION, self.tr('Period length for multi-annual mean'), type=0, defaultValue=365, minValue=1 ) ) self.addParameter( QgsProcessingParameterNumber( self.CBEGIN, self.tr('Starting PCA component'), type=0, defaultValue=2)) self.addParameter( QgsProcessingParameterNumber( self.CEND, self.tr('Final PCA component (0 for last)'), type=0, defaultValue=5)) self.addParameter( QgsProcessingParameterRasterDestination( self.OUTPUT, self.tr('Output landscape stratification metric') ) ) def processAlgorithm(self, parameters, context, feedback): Smooth_TS = self.parameterAsString( parameters, self.INPUT, context ) datefile = self.parameterAsString( parameters, self.INPUTDATE, context ) output_raster = self.parameterAsOutputLayer( parameters, self.OUTPUT, context ) output_folder = os.path.dirname(output_raster) prefix = os.path.splitext(os.path.basename(output_raster))[0] + '_' begin_date = parameters['BEGDATE'] if parameters['BEGDATE'] != None else 20000101 end_date = parameters['ENDDATE'] if parameters['ENDDATE'] != None else int(datetime.date.today().strftime("%Y%m%d")) cbegin = parameters['CBEGIN'] cend = parameters['CEND'] tmp_name = prefix + 'temp' tmp_folder = os.path.join(output_folder,tmp_name) if not os.path.exists(tmp_folder): os.mkdir(tmp_folder) if datefile == '': datefile = os.path.splitext(Smooth_TS)[0] + '_dates.txt' if int(begin_date)!=20000101 or int(end_date)!=int(datetime.date.today().strftime("%Y%m%d")): TS_file, datefile = NDVI_subdates_maker(Smooth_TS,datefile,tmp_folder,int(begin_date),int(end_date),context,feedback) else : TS_file = Smooth_TS ndv = getNoDataValue(TS_file) # Compute mean time series over the period exp = moy_NDVI_TS(TS_file,datefile,parameters['STARTMONTH'], parameters['STARTDAY'], parameters['DURATION']) Moy_TS = os.path.join(tmp_folder, prefix+"LandStrat_mean.tif") BMX_parameters = {'il':[TS_file],'exp':exp, 'out':Moy_TS, "outputpixeltype":2, 'outcontext':os.path.join(output_folder,"outcontext.txt")} processing.run("otb:BandMathX", BMX_parameters, context=context, feedback=feedback) os.remove(os.path.join(output_folder,"outcontext.txt")) if ndv is not None: setNoDataValue(Moy_TS, ndv) mask = os.path.join(tmp_folder, prefix + "mask.tif") ND_app_parameters = {"in": TS_file, "out": mask, "mode": "buildmask", 'mode.apply.mask': TS_file, "outputpixeltype": 0} processing.run('otb:ManageNoData', ND_app_parameters, context=context, feedback=feedback) # Compute metric using no-data value LS_Strat_pre = os.path.join(tmp_folder, prefix+"LandStrat_metric_pre.tif") LS_app_parameters = {"ndvits": Moy_TS, "out": LS_Strat_pre, "outputpixeltype": 5} if ndv is not None: LS_app_parameters["bv"] = ndv if cbegin != None : LS_app_parameters["cbegin"]=cbegin if cend != None : LS_app_parameters["cend"]=cend processing.run("otb:LandscapeStratificationMetric", LS_app_parameters, context=context, feedback=feedback) LS_Strat_raw = os.path.join(tmp_folder, prefix + "LandStrat_metric_raw.tif") ND_app_parameters = {'in': LS_Strat_pre, 'out': LS_Strat_raw, 'mode': 'apply', 'mode.apply.mask': mask, 'outputpixeltype': 5} processing.run('otb:ManageNoData', ND_app_parameters, context=context, feedback=feedback) setNoDataValue(LS_Strat_raw, ndv) LS_Strat_norm = os.path.join(tmp_folder, prefix+"LandStrat_metric_norm.tif") DC_app_parameters={"in":LS_Strat_raw, "out":LS_Strat_norm, "mask":mask, "outmin":0, "outmax":2048, "outputpixeltype":2} processing.run("otb:DynamicConvert", DC_app_parameters, context=context, feedback=feedback) LS_Strat = output_raster ND_app_parameters = {'in': LS_Strat_norm, 'out':LS_Strat, 'mode':'apply', 'mode.apply.mask':mask, 'outputpixeltype':2} out_layer = processing.run('otb:ManageNoData', ND_app_parameters, context=context, feedback=feedback) setNoDataValue(LS_Strat, ndv) return {self.OUTPUT: out_layer}