s2planetary.py 3.44 KB
Newer Older
from TimeSeries.s2sen2cor import *
import planetary_computer as PC
from pystac_client import Client
import rasterio
from Common.geotools import get_query_bbox
from pyproj import Transformer as T
from shapely.geometry import Polygon
def fetch(shp, dt, output_fld, auth=None, band_list=None):
    bbox, i_bbox, shp_srs = get_query_bbox(shp, return_all=True)
    if auth is not None:
        os.environ['PC_SDK_SUBSCRIPTION_KEY'] = auth
    api = Client.open('https://planetarycomputer.microsoft.com/api/stac/v1', modifier=PC.sign_inplace)
    res = api.search(collections="sentinel-2-l2a", bbox=bbox, datetime=dt)
    lst = ['B02','B03','B04','B05','B06','B07','B08','B8A','B11','B12','SCL']
    if band_list is not None:
        lst = band_list
    prg = tqdm.tqdm(total=len(res.item_collection()) * len(lst), desc="Fetching from Planetary")
        with rasterio.open(item.assets['B02'].href) as ds:
            img_srs = ds.crs.to_epsg()
        if shp_srs == img_srs:
            o_bbox = [20 * round(x / 20) for x in i_bbox]
        else:
            tr = T.from_crs(shp_srs, img_srs, always_xy=True)
            x_min, y_min = tr.transform(i_bbox[0], i_bbox[1])
            x_max, y_max = tr.transform(i_bbox[2], i_bbox[3])
            o_bbox = [20 * round(x / 20) for x in [x_min, y_min, x_max, y_max]]
            ofn = os.path.join(output_fld, '/'.join(item.assets[a].get_absolute_href().split('?')[0].split('/')[10:]))
            os.makedirs(os.path.dirname(ofn),exist_ok=True)
            with rasterio.open(item.assets[a].href) as img:
                out_img, out_geot = rasterio.mask.mask(img,
                                                       [Polygon.from_bounds(o_bbox[0], o_bbox[1],
                                                                            o_bbox[2], o_bbox[3])],
                                                       crop=True)
                out_meta = img.meta
            out_meta.update({"driver": "GTiff",
                             "height": out_img.shape[1],
                             "width": out_img.shape[2],
                             "transform": out_geot})
            with rasterio.open(ofn, "w", **out_meta) as dest:
                dest.write(out_img)
    if band_list is None:
        return S2PlaneteryPipeline(output_fld)
    else:
        warnings.warn("Queried for a non-default band list. Skipping pipeline setup.")
        return
class S2PlanetaryTilePipeline(S2Sen2CorTilePipeline):
    NAME = 'S2-L2A-SEN2COR-PLANETARY'
    PTRN_10m = ['GRANULE/*/IMG_DATA/R10m/*_B02_10m.tif',
                'GRANULE/*/IMG_DATA/R10m/*_B03_10m.tif',
                'GRANULE/*/IMG_DATA/R10m/*_B04_10m.tif',
                'GRANULE/*/IMG_DATA/R10m/*_B08_10m.tif']
    PTRN_20m = ['GRANULE/*/IMG_DATA/R20m/*_B05_20m.tif',
                'GRANULE/*/IMG_DATA/R20m/*_B06_20m.tif',
                'GRANULE/*/IMG_DATA/R20m/*_B07_20m.tif',
                'GRANULE/*/IMG_DATA/R20m/*_B8A_20m.tif',
                'GRANULE/*/IMG_DATA/R20m/*_B11_20m.tif',
                'GRANULE/*/IMG_DATA/R20m/*_B12_20m.tif']
    PTRN_msk = ['GRANULE/*/IMG_DATA/R20m/*_SCL_20m.tif']
    PTRN_ful = PTRN_10m[0:3] + PTRN_20m[0:3] + [PTRN_10m[3]] + PTRN_20m[3:]
class S2PlaneteryPipeline(S2Sen2CorPipeline):
    S2TilePipeline = S2PlanetaryTilePipeline
    _check = S2TilePipeline._check
    _tile_id = S2TilePipeline._tile_id