Source code for Landsat8_by_date

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# This file is part of PHYMOBAT 1.0.
# Copyright 2016 Sylvio Laventure (IRSTEA - UMR TETIS)
# 
# PHYMOBAT 1.0 is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# PHYMOBAT 1.0 is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with PHYMOBAT 1.0.  If not, see <http://www.gnu.org/licenses/>.

import os, sys
import math, subprocess
try :
    import gdal
except :
    from osgeo import gdal

import numpy as np

[docs]class Landsat8_by_date(): """ Satellite image processing's class. This class include several processes to group images by date, mosaic images by date, extract images information, compute ndvi, compute cloud pourcent and create new rasters. :param class_archive: Archive class name with every information on downloaded images :type class_archive: class :param big_folder: Image processing folder :type big_folder: str :param one_date: [year, month, day] ... This variable is modified in the function :func:`mosaic_by_date()`. To append mosaic image path, mosaic cloud image path, cloud pixel value table, mosaic ndvi image path and ndvi pixel value table. :type one_date: list of str """ def __init__(self, class_archive, big_folder, one_date): """Create a new 'Landsat_by_date' instance """ self._class_archive = class_archive self._big_folder = big_folder self._one_date = one_date # Verify list of list of str if one_date == []: print "Enter dates to mosaic images like [[str(year), str(month), str(day)], [str(year), str(month), str(day)], ...]" sys.exit(1) else: self._one_date = one_date
[docs] def group_by_date(self, d_uni): """ Function to extract images on a single date :param d_uni: [year, month, day] :type d_uni: list of str :returns: list of str -- variable **group** = [year, month, day, multispectral image path, cloud image path] """ # Group of images with the same date group = [] # Take images with the same date for d_dup in self._class_archive.list_img: if d_dup[:3] == d_uni: group.append(d_dup) return group
[docs] def vrt_translate_gdal(self, vrt_translate, src_data, dst_data): """ Function to launch gdal tools in command line. With ``gdalbuildvrt`` and ``gdal_translate``. This function is used :func:`mosaic_by_date` to mosaic image by date. :param vrt_translate: ``vrt`` or ``translate`` :type vrt_translate: str :param src_data: Data source. Several data for vrt process and one data (vrt data) for gdal_translate :type src_data: list (process ``vrt``) or str (process ``translate``) :param dst_data: Output path :type dst_data: str """ if os.path.exists(dst_data): os.remove(dst_data) # Select process if vrt_translate == 'vrt': # Verify input data if type(src_data) is not np.ndarray: print 'VRT file ! The data source should be composed of several data. A list minimal of 2 dimensions' sys.exit(1) print 'Build VRT file' if not os.path.exists(dst_data): process_tocall = ['gdalbuildvrt', '-srcnodata', '-10000', dst_data] # Copy rasters for cp_image in src_data: process_tocall.append(cp_image) elif vrt_translate == 'translate': # Verify input data try : src_data = str(src_data) except:# if type(src_data) is not str: print 'Geotiff file ! The data source should be composed of path file. A character string !' sys.exit(1) print 'Build Geotiff file' if not os.path.exists(dst_data): process_tocall = ['gdal_translate', '-a_nodata', '-10000', src_data, dst_data] # Launch vrt process subprocess.call(process_tocall)
[docs] def mosaic_by_date(self): """ Function to merge images of the same date in a image group :func:`group_by_date`. """ # Create the processing images folder if not exists if not os.path.exists(self._class_archive._folder + '/' + self._big_folder): os.mkdir(self._class_archive._folder + '/' + self._big_folder) # Matrix multi images for a single date group = self.group_by_date(self._one_date) # Every images [year, month, day, multispectral image, cloud image] group_ = np.transpose(np.array(group)) # Transpose matrix to extract path of images # Create a folder with images year if it doesn't exist index_repertory_img = self._one_date[0] if not os.path.exists(self._class_archive._folder + '/' + self._big_folder + '/' + index_repertory_img): os.mkdir(self._class_archive._folder + '/' + self._big_folder + '/' + index_repertory_img) index_repertory_img = index_repertory_img + '/' # Create a folder with images date if it doesn't exist for d_ in self._one_date: index_repertory_img = index_repertory_img + d_ if not os.path.exists(self._class_archive._folder + '/' + self._big_folder + '/' + index_repertory_img): os.mkdir(self._class_archive._folder + '/' + self._big_folder + '/' + index_repertory_img) # Build VRT file with data images required vrt_out = self._class_archive._folder + '/' + self._big_folder + '/' + index_repertory_img + '/' \ + self._class_archive._captor + index_repertory_img.split("/")[1] + '.VRT' # Multispectral VRT outfile self.vrt_translate_gdal('vrt', group_[3], vrt_out) vrtcloud_out = self._class_archive._folder + '/' + self._big_folder + '/' + index_repertory_img + '/' \ + self._class_archive._captor + index_repertory_img.split("/")[1] + '_' + group_[4][0][-7:-4] + '.VRT' # Cloud TIF outfile self.vrt_translate_gdal('vrt', group_[4], vrtcloud_out) # Build Geotiff file with data images required gtif_out = vrt_out[:-4] + '.TIF' # Multispectral VRT outfile self.vrt_translate_gdal('translate', vrt_out, gtif_out) self._one_date.append(gtif_out) gtifcloud_out = vrtcloud_out[:-4] + '.TIF' # Cloud TIF outfile self.vrt_translate_gdal('translate', vrtcloud_out, gtifcloud_out) self._one_date.append(gtifcloud_out)
[docs] def raster_data(self, img): """ Function to extract raster information. Return table of pixel values and raster information like line number, pixel size, ... (gdal pointer) :param img: Raster path :type img: str :returns: matrix -- variable **data**, Pixel value matrix of a raster. gdal pointer -- variable **_in_ds**, Raster information. """ # Load Gdal's drivers gdal.AllRegister() # Loading input raster print 'Loading input raster :', os.path.split(str(img))[1][:-4] in_ds = gdal.Open(str(img), gdal.GA_ReadOnly) # if it doesn't exist if in_ds is None: print('could not open ') sys.exit(1) # Information on the input raster nbband = in_ds.RasterCount # Spectral band number rows = in_ds.RasterYSize # Rows number cols = in_ds.RasterXSize # Columns number # Table's declaration data = [] #np.float32([[0]*cols for i in xrange(rows)]) for band in range(nbband): canal = in_ds.GetRasterBand(band + 1) # Select a band if nbband == 1: data = canal.ReadAsArray(0, 0, cols, rows).astype(np.float32) # Assign pixel values at the data else: data.append(canal.ReadAsArray(0, 0, cols, rows).astype(np.float32)) print('Copie des pixels du raster ! Bande :', (band + 1)) ################################### # Close input raster _in_ds = in_ds in_ds = None return data, _in_ds
[docs] def pourc_cloud(self, img_spec, img_cloud): """ Return clear pixel percentage on the image **img_spec** because of a cloud image **img_cloud**. :param img_spec: Spectral image path :type img_spec: str :param img_cloud: Cloud image path :type img_cloud: str :returns: float -- variable **nb0**, clear pixel percentage. :Example: >>> import Landsat8_by_date >>> Landsat_test = Landsat8_by_date(class_archive, big_folder, one_date) >>> nb0_test = Landsat_test.pourc_cloud(Landsat_test._one_date[3], Landsat_test._one_date[4]) >>> nb0_test 98 """ # Extract raster's information data_spec, info_spec = self.raster_data(img_spec) data_cloud, info_cloud = self.raster_data(img_cloud) self._one_date.append(data_cloud) # Add cloud pixel value table # Extent of the images mask_spec = np.in1d(data_spec[0], [-10000, math.isnan], invert=True) # ex : array([ True, True, True, True, False, True, True, True, True], dtype=bool) -> False where there is -10000 ou NaN # Print area account of the pixel size 'info_spec.GetGeoTransform()' print 'Area = ' + str(float((np.sum(mask_spec) * info_spec.GetGeoTransform()[1] * abs(info_spec.GetGeoTransform()[-1]) )/10000)) + 'ha' # Cloud mask mask_cloud = np.in1d(data_cloud, 0) # This is the same opposite False where there is 0 cloud = np.choose(mask_cloud, (False, mask_spec)) # If True in cloud mask, it take spectral image else False dist = np.sum(cloud) # Sum of True. True is cloud # Computer cloud's percentage with dist (sum of cloud) by sum of the image's extent try : nb0 = float(dist)/(np.sum(mask_spec)) print('For ' + os.path.split(str(img_spec))[1][:-4] + ', cloud cover ' + str(100 - round(nb0*100, 2)) + "%") except ZeroDivisionError: nb0 = 0 print("The raster isn\'t in the area !") return nb0
[docs] def calcul_ndvi(self, img_spec): """ Computer NDVI index for a Landsat image. NDVI = band4 - band3 / band4 + band3 :param img_spec: Spectral image path :type img_spec: str """ # Extract raster's information data, in_ds = self.raster_data(img_spec) # Computer NDVI mask = np.greater(data[0], -10000) ndvi = np.choose(mask, (-10000, eval('(data[4]-data[3])') / eval('(data[4]+data[3])'))) # If True, -10000 (NaN) else compute mathematical operation # Outfile name img_ndvi = img_spec[:-4] + '_ndvi.TIF' self._one_date.append(img_ndvi) # Add ndvi image path self._one_date.append(ndvi) # Add ndvi pixel value table self.create_raster(img_ndvi, ndvi, in_ds)
[docs] def create_raster(self, out_raster, data, in_ds): """ Create raster :param out_raster: Output image path :type out_raster: str :param data: Pixel value matrix. Matrix size equal to that of a raster. :type data: matrix :param in_ds: Raster information :type in_ds: gdal pointer """ if os.path.exists(str(out_raster)): os.remove(str(out_raster)) # Verify if the processing take input band or one spectral band if data.ndim == 2: nbband = 1 else: nbband = in_ds.RasterCount if not os.path.exists(str(out_raster)): # Create outfile driver = gdal.GetDriverByName('GTiff') out_ds = driver.Create(str(out_raster), in_ds.RasterXSize, in_ds.RasterYSize, nbband, gdal.GDT_Float32) if out_ds is None: print 'Could not create ' + os.path.split(str(out_raster))[1] sys.exit(1) p = 0 # Increment for the number band while p < nbband: #Incrementation p = p + 1 print "Copie sur la bande ", p # Get extent coordinates and raster resolution transform = in_ds.GetGeoTransform() # print transform minX = transform[0] maxY = transform[3] pixelWidth = transform[1] pixelHeight = transform[5] geotransform = [minX, pixelWidth, 0, maxY, 0, pixelHeight] # Record projection def_projection = in_ds.GetProjection() # Loading spectral band of outfile out_band = out_ds.GetRasterBand(p) # write the data if data.ndim == 2: out_band.WriteArray(data, 0, 0) else: out_band.WriteArray(data[p-1], 0, 0) # Closing and statistics on output raster out_band.FlushCache() out_band.SetNoDataValue(-10000) out_band.GetStatistics(-1, 1) out_band = None # Set the geo-traking and outfile projection out_ds.SetGeoTransform(geotransform) out_ds.SetProjection(def_projection) # Close les données ouvertes out_ds = None