Forked from HYCAR-Hydro / airGR
Source project has a limited visibility.
s1planetary.py 3.64 KiB
import os.path
from TimeSeries.s1base import *
import planetary_computer as PC
from pystac_client import Client
from pyproj import Transformer as T
from shapely.geometry import Polygon
import rasterio
import rasterio.mask
import tqdm
import otbApplication as otb
import json
from Common.geotools import get_query_bbox

def fetch(shp, dt, output_fld, api_key, direction=None):

    os.environ['PC_SDK_SUBSCRIPTION_KEY'] = api_key
    bbox, i_bbox, shp_srs = get_query_bbox(shp, return_all=True)

    api = Client.open('https://planetarycomputer.microsoft.com/api/stac/v1', modifier=PC.sign_inplace)
    res = api.search(collections="sentinel-1-rtc", bbox=bbox, datetime=dt)
    lst = ['vh', 'vv']

    prg = tqdm.tqdm(total=len(res.item_collection()) * len(lst), desc="Fetching from Planetary")
    for item in res.items():
        if direction is not None:
            if item.properties['sat:orbit_state'] != direction:
                continue
        mfn = os.path.join(output_fld, item.assets['vh'].get_absolute_href().split('?')[0].split('/')[10])
        mfn = os.path.join(mfn, 'item_properties.json')
        os.makedirs(os.path.dirname(mfn), exist_ok=True)
        with open(mfn, 'w') as js:
            json.dump(item.properties, js)
        with rasterio.open(item.assets['vh'].href) as ds:
            img_srs = ds.crs.to_epsg()
        if shp_srs == img_srs:
            o_bbox = [20 * round(x / 20) for x in i_bbox]
        else:
            tr = T.from_crs(shp_srs, img_srs, always_xy=True)
            x_min, y_min = tr.transform(i_bbox[0], i_bbox[1])
            x_max, y_max = tr.transform(i_bbox[2], i_bbox[3])
            o_bbox = [20 * round(x / 20) for x in [x_min, y_min, x_max, y_max]]

        for a in lst:
            ofn = os.path.join(output_fld, '/'.join(item.assets[a].get_absolute_href().split('?')[0].split('/')[10:]))
            os.makedirs(os.path.dirname(ofn), exist_ok=True)
            with rasterio.open(item.assets[a].href) as img:
                out_img, out_geot = rasterio.mask.mask(img,
                                                       [Polygon.from_bounds(o_bbox[0], o_bbox[1],
                                                                            o_bbox[2], o_bbox[3])],
                                                       crop=True)
                out_meta = img.meta
            out_meta.update({"driver": "GTiff",
                             "height": out_img.shape[1],
                             "width": out_img.shape[2],
                             "transform": out_geot})
            with rasterio.open(ofn, "w", **out_meta) as dest:
                dest.write(out_img)
            prg.update()
    prg.close()

    return S1RTCPipeline(output_fld, ofn, direction=direction)

class S1RTCPipeline(S1GRDPipeline):
    # --- BEGIN SENSOR PROTOTYPE ---
    NAME = 'S1-IW-RTC'
    VAL_TYPE = otb.ImagePixelType_int16
    PTRN_dir = 'S1*_IW_GRD*'
    PTRN_ref = 'iw-v'
    VH_name = 'vh'
    PTRN = ['measurement/iw-vh.rtc.tiff', 'measurement/iw-vv.rtc.tiff']
    FEAT_exp = {
        'VH': 'im1b1',
        'VV': 'im2b1',
        'VH_db': '1000*log10(abs(im1b1)+1e-6)',
        'VV_db': '1000*log10(abs(im2b1)+1e-6)',
        'POL_RATIO': 'im1b1/im2b1'
    }
    NDT = -32768.0

    @classmethod
    def _check_direction(cls, x):
        with open(os.path.join(x, 'item_properties.json'), 'r') as js:
            dct = json.load(js)
        if dct['sat:orbit_state'] == 'ascending':
            return 0
        elif dct['sat:orbit_state'] == 'descending':
            return 1
        else:
            return None

    @classmethod
    def _get_stitched_filename(cls, fn):
        fnx = os.path.splitext(fn)
        return fnx[0] + '.stitched' + fnx[1]