Source code for dtcg.datacube.cryotempo_eolis

"""Copyright 2025 DTCG Contributors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

=====

Functionality for retrieving and resampling CryoTEMPO-EOLIS derived elevation
change data.
"""

from __future__ import annotations

import logging
import os
from time import perf_counter

import geopandas as gpd
import numpy as np
import pandas as pd
import pyproj
import shapely
import xarray as xr
from affine import Affine
from pandas import Timedelta
from pyproj import CRS, Proj
from rasterio import features
from salem.gis import Grid
from scipy.ndimage import gaussian_filter
from scipy.spatial.distance import cdist
from shapely.geometry import shape
from specklia import Specklia

os.environ["PROJ_LIB"] = pyproj.datadir.get_data_dir()
logger = logging.getLogger(__name__)


[docs] class DatacubeCryotempoEolis: """Functionality for adding EOLIS data to a OGGM datacube. Attributes ---------- SPECKLIA_DATASET_NAME_EOLIS_ELEVATION_CHANGE : str Name of the dataset in Specklia to add to the OGGM datacube. EOLIS_STATIC_KEYS : list[str] EOLIS metadata keys that are static across product files. EOLIS_PRODUCT_KEYS : list[str] EOLIS metadata keys that are unique for each product. """
[docs] def __init__(self: DatacubeCryotempoEolis): self.SPECKLIA_DATASET_NAME_EOLIS_ELEVATION_CHANGE = ( "CryoTEMPO-EOLIS Interpolated Elevation Change Maps" ) self.EOLIS_STATIC_KEYS = [ "Conventions", "DOI", "Metadata_Conventions", "baseline", "cdm_data_type", "comment", "contact", "creator_email", "creator_url", "institution", "keywords", "keywords_vocabulary", "platform", "processing_level", "project", "references", "region", "source", "summary", "title", "version", ] self.EOLIS_PRODUCT_KEYS = [ "science_pds_path", "Earth_Explorer_Header", "science_pds_download_link", "date_created", "date_modified", "geospatial_projection", "geospatial_resolution", "geospatial_resolution_units", "geospatial_x_max", "geospatial_x_min", "geospatial_x_units", "geospatial_y_max", "geospatial_y_min", "geospatial_y_units", "time_coverage_duration", "time_coverage_end", "time_coverage_start", "version", ]
[docs] def convert_gridded_dataframe_to_array( self: DatacubeCryotempoEolis, gridded_df: pd.DataFrame, value_column_names: list[str], x_coordinate_column: str, y_coordinate_column: str, spatial_resolution: float, xy_projection: Proj, y_affine_negative: bool = True, t_coordinate_column: str = None, ) -> tuple[dict[str, np.ndarray], Grid, np.ndarray | None]: """Resolve arrays from sparse gridded data stored in a dataframe. For each column name specified, an array (either 2d, or 3d if a time coordinate is supplied) is created using the x and y extent in the gridded dataframe and the provided resolution, and is populated with the sparse gridded data from the dataframe. Parameters ---------- gridded_df : pd.DataFrame DataFrame containing sparse gridded data, including coordinate columns and value columns. value_column_names : list[str] List of column names whose values are to be gridded. x_coordinate_column : str Name of the column containing x coordinates. y_coordinate_column : str Name of the column containing y coordinates. spatial_resolution : float Resolution to use when constructing the regular grid. xy_projection : Proj PyProj projection object representing the spatial reference system. y_affine_negative : bool, optional Whether to invert the y-axis when computing affine transformation, by default True. t_coordinate_column : str, optional Optional name of time coordinate column to create a 3D array, by default None. Returns ------- tuple[dict[str, np.ndarray], Grid, np.ndarray or None] - Dictionary of gridded arrays keyed by column name - Salem Grid object describing the spatial extent - Optional array of sorted time coordinates (if time is used) """ # Validate input for col in [x_coordinate_column, y_coordinate_column] + value_column_names: if col not in gridded_df.columns: raise ValueError( f"Missing required column '{col}' in gridded_df" ) unique_x_points = np.sort(gridded_df[x_coordinate_column].unique()) unique_y_points = np.sort(gridded_df[y_coordinate_column].unique()) # prepare coordinate axes desired_x_axis = np.arange( unique_x_points[0], unique_x_points[-1] + spatial_resolution, spatial_resolution, ) desired_y_axis = np.arange( unique_y_points[0], unique_y_points[-1] + spatial_resolution, spatial_resolution, ) # we need to offset both these axes by one in order to use # np.searchsorted() correctly below. y_idx = np.searchsorted(desired_y_axis, gridded_df[y_coordinate_column]) x_idx = np.searchsorted(desired_x_axis, gridded_df[x_coordinate_column]) # time axis handling if t_coordinate_column is not None: desired_t_axis = np.sort(gridded_df[t_coordinate_column].unique()) t_idx = np.searchsorted(desired_t_axis, gridded_df[t_coordinate_column]) else: desired_t_axis = None # Affine transformation if y_affine_negative: desired_y_axis = desired_y_axis[::-1] transform = Affine.translation( desired_x_axis.min() - spatial_resolution / 2, desired_y_axis.max() + spatial_resolution / 2, ) * Affine.scale(spatial_resolution, -spatial_resolution) else: transform = Affine.translation( desired_x_axis.min() - spatial_resolution / 2, desired_y_axis.min() - spatial_resolution / 2, ) * Affine.scale(spatial_resolution, spatial_resolution) # create the output raster, initially filling it with NaN output_arrays = {} for column_name in value_column_names: if desired_t_axis is not None: grid_arr = np.full( (len(desired_t_axis), len(desired_y_axis), len(desired_x_axis)), np.nan, ) grid_arr[t_idx, y_idx, x_idx] = gridded_df[column_name] else: grid_arr = np.full( (len(desired_y_axis), len(desired_x_axis)), np.nan ) grid_arr[y_idx, x_idx] = gridded_df[column_name] if y_affine_negative: grid_arr = np.flip(grid_arr, axis=-2) output_arrays[column_name] = grid_arr grid = Grid( proj=xy_projection, nxny=(len(desired_x_axis), len(desired_y_axis)), dxdy=(transform.a, transform.e), x0y0=(transform.c, transform.f), pixel_ref="center", ) return output_arrays, grid, desired_t_axis
[docs] def prepare_eolis_metadata( self: DatacubeCryotempoEolis, specklia_source_data: list[dict], preliminary_dataset: bool = True, ) -> dict: """Extract metadata from Specklia dataset source information. Parameters ---------- specklia_source_data : list[dict] List of source information dictionaries returned by Specklia. preliminary_dataset : bool, optional Whether to return all metadata from the first product without parsing, by default True. Returns ------- dict Dictionary of static metadata and product-specific attributes, grouped by filename. """ first_source = specklia_source_data[0].get("source_information", {}) if preliminary_dataset: return first_source metadata = { k: first_source.get(k) for k in self.EOLIS_STATIC_KEYS if k in first_source } product_attributes = {} for item in specklia_source_data: source_info = item.get("source_information", {}) science_path = source_info.get("science_pds_path", "") file_name = ( os.path.basename(science_path) if science_path else "unknown_file" ) product_attributes[file_name] = { ("original_" + k if "geospatial" in k else k): source_info[k] for k in self.EOLIS_PRODUCT_KEYS if k in source_info } metadata["product_attributes"] = product_attributes return metadata
[docs] def create_query_polygon( self: DatacubeCryotempoEolis, oggm_ds: xr.Dataset ) -> shapely.Polygon: """Create a WGS84-aligned polygon bounding box from the spatial extent of an OGGM dataset. Parameters ---------- oggm_ds : xr.Dataset OGGM dataset a geospatial CRS to extract extent from. Returns ------- shapely.Polygon Polygon in EPSG:4326 (lat/lon) describing the outer bounds of the OGGM dataset. """ # Create query polygon for Specklia, matching the extent of the oggm # data cube and then reprojected to WGS84 # should consider buffering this a bit to avoid edge features after # resampling if not oggm_ds.rio.crs: oggm_ds.rio.write_crs(oggm_ds.pyproj_srs, inplace=True) # Reproject the dataset to EPSG:4326 ds_ll = oggm_ds.rio.reproject("EPSG:4326") # build query polygon from bounds bounds = ds_ll.rio.bounds() return shapely.box(*bounds)
[docs] def retrieve_data_from_specklia( self: DatacubeCryotempoEolis, query_polygon: shapely.Polygon, specklia_data_set_name: str, specklia_api_key: str, min_timestamp: int = None, max_timestamp: int = None, ) -> tuple[gpd.GeoDataFrame, list[dict], pd.Series]: """Query and retrieve data from the Specklia API for a given spatial and temporal extent. Parameters ---------- query_polygon : shapely.Polygon WGS84 polygon representing the spatial area to query. specklia_data_set_name : str Name of the dataset to query from Specklia. specklia_api_key : str API key used to authenticate with the Specklia service. min_timestamp : int, optional Optional minimum timestamp to filter results, by default None. If not set then the minimum timestamp of the dataset in specklia will be used. max_timestamp : int, optional Optional maximum timestamp to filter results, by default None. If not set then the maximum timestamp of the dataset in specklia will be used. Returns ------- tuple[gpd.GeoDataFrame, list[dict], pd.Series] - GeoDataFrame of geospatial data queried from Specklia - List of metadata dicts for all data in the retrieved geodataframe - Dataset-level metadata for the Specklia dataset queried """ client = Specklia(specklia_api_key) available_datasets = client.list_datasets() matching_dataset = available_datasets[ available_datasets["dataset_name"] == specklia_data_set_name ] if matching_dataset.empty: raise ValueError( f"No dataset named '{specklia_data_set_name}' " "found in Specklia." ) specklia_dataset_info = matching_dataset.iloc[0] if min_timestamp is None: min_timestamp = specklia_dataset_info["min_timestamp"] - Timedelta( seconds=1 ) if max_timestamp is None: max_timestamp = specklia_dataset_info["max_timestamp"] + Timedelta( seconds=1 ) # query the data from Specklia query_start_time = perf_counter() gdf, source_info = client.query_dataset( dataset_id=specklia_dataset_info["dataset_id"], epsg4326_polygon=query_polygon, min_datetime=min_timestamp, max_datetime=max_timestamp, ) logger.debug( f"Specklia query took {perf_counter()-query_start_time:.2f} seconds" ) return gdf, source_info, specklia_dataset_info
[docs] def gaussian_filter_fill( self: DatacubeCryotempoEolis, arr: np.ndarray, sigma: float | tuple[float] = 3.0, **kwargs) -> np.ndarray: """Fill NaN regions in an array using Gaussian filter. The data and a validity mask are filtered separately and combined so that only NaNs are replaced, while original values remain unchanged. Parameters ---------- arr : np.ndarray Input array with NaNs. sigma : float | tuple[float] Standard deviation(s) for the Gaussian kernel, by default 3.0 **kwargs : dict Additional arguments passed to scipy.ndimage.gaussian_filter. Returns ------- np.ndarray Copy of arr with NaNs filled by Gaussian-smoothed neighbors. """ # Mask: 1 where valid, 0 where NaN mask = np.isfinite(arr) # Replace NaNs with 0 for convolution filled = np.nan_to_num(arr, nan=0.0) # Apply filter to data and mask filtered = gaussian_filter(filled, sigma=sigma, **kwargs) norm = gaussian_filter(mask.astype(float), sigma=sigma, **kwargs) # Avoid division by zero with np.errstate(invalid="ignore", divide="ignore"): result = filtered / norm result[norm == 0] = np.nan arr_filled = arr.copy() arr_filled[~mask] = result[~mask] return arr_filled
[docs] def retrieve_prepare_eolis_gridded_data( self: DatacubeCryotempoEolis, oggm_ds: xr.Dataset, grid: Grid ) -> xr.Dataset: """Retrieve EOLIS gridded elevation change data and resample it to the given OGGM grid. Parameters ---------- oggm_ds : xr.Dataset OGGM xarray dataset to which EOLIS data will be added. grid : Grid Grid object representing the OGGM grid projection and extent. Returns ------- xr.Dataset Modified OGGM dataset including resampled EOLIS elevation change and uncertainty arrays. """ # build query polygon from OGGM data cube specklia_query_polygon = self.create_query_polygon(oggm_ds) # query EOLIS dataset from specklia over full time period eolis_gridded_data, eolis_gridded_sources, eolis_gridded_dataset = ( self.retrieve_data_from_specklia( specklia_query_polygon, self.SPECKLIA_DATASET_NAME_EOLIS_ELEVATION_CHANGE, os.getenv("SPECKLIA_API_KEY"), ) ) elevation_change_var_name = "elevation_change" elevation_change_sigma_var_name = "elevation_change_sigma" # convert spare dataframe to data cube eolis_arrays, eolis_grid, time_coordinates = ( self.convert_gridded_dataframe_to_array( eolis_gridded_data, [elevation_change_var_name, elevation_change_sigma_var_name], "x", "y", np.nanmin(np.abs(np.diff(eolis_gridded_data.x.unique()))), eolis_gridded_sources[0]["source_information"]["xy_cols_proj4"], y_affine_negative=True, t_coordinate_column="timestamp", ) ) resampling_start_time = perf_counter() # add EOLIS elevation and uncertainty to OGGM data cube eolis_metadata = self.prepare_eolis_metadata(eolis_gridded_sources) for col in [elevation_change_var_name, elevation_change_sigma_var_name]: data_name = f"eolis_gridded_{col}" # buffer eolis grid using gaussian filter to avoid data loss at # the margins buffered_eolis_array = self.gaussian_filter_fill(eolis_arrays[col]) eolis_resampled_grid = grid.map_gridded_data( buffered_eolis_array, eolis_grid, interp="linear" ) column_info = [ d for d in eolis_gridded_dataset["columns"] if d["name"] == col ][0] eolis_resampled_grids_xarr = xr.DataArray( eolis_resampled_grid, coords={"t": time_coordinates, "y": oggm_ds.y, "x": oggm_ds.x}, dims=("t", "y", "x"), name=data_name, attrs={ "units": column_info["unit"], } | eolis_metadata[col], ) # mask to glacier if "glacier_mask" in oggm_ds.variables: eolis_resampled_grids_xarr = eolis_resampled_grids_xarr.where( oggm_ds.glacier_mask != 0) oggm_ds[data_name] = eolis_resampled_grids_xarr logger.debug( "Resampling of EOLIS and creation of dataset took " f"{perf_counter()-resampling_start_time:.2f} seconds" ) self.augment_dataset_with_1d_timeseries( eolis_gridded_data, oggm_ds, elevation_change_var_name, elevation_change_sigma_var_name ) return oggm_ds
[docs] def augment_dataset_with_1d_timeseries( self: DatacubeCryotempoEolis, eolis_gridded_data: gpd.GeoDataFrame, oggm_ds: xr.Dataset, elevation_change_var_name: str, elevation_change_sigma_var_name: str ) -> None: """Generate and attach 1D time series of elevation change and uncertainty to an OGGM dataset. This method masks gridded EOLIS data to the glacier extent, computes mean elevation change and propagated uncertainties per timestamp, and appends them to the input OGGM dataset as new time series variables. Parameters ---------- eolis_gridded_data : geopandas.GeoDataFrame EOLIS gridded data containing x/y locations, timestamps, and elevation change values. oggm_ds : xr.Dataset OGGM dataset to which new time series variables will be added. elevation_change_var_name : str Name of the column holding elevation change values in the input data. elevation_change_sigma_var_name : str Name of the column holding uncertainty values in the input data. Returns ------- None The input dataset is modified in place with new 1D time series variables. """ logger.debug("Generating 1D time series.") timeseries_gen_start_time = perf_counter() # mask dataset to glacier extent vector_glacier_mask = self.create_vector_glacier_mask( oggm_ds, eolis_gridded_data.crs ) eolis_gridded_data_masked = eolis_gridded_data[ eolis_gridded_data.intersects( vector_glacier_mask.geometry.union_all() ) ] # generate timeseries with uncertainties elevation_change_timeseries, error_timeseries = \ self.generate_1d_timeseries( eolis_gridded_data_masked, elevation_change_var_name, elevation_change_sigma_var_name ) # add timeseries to datacube timeseries_data = { elevation_change_var_name: { "data": elevation_change_timeseries, "source": "Values represent glacier-wide mean elevation" " change, computed as the average of all valid grid cells" f" (eolis_gridded_{elevation_change_var_name}) within the " "glacier mask.", "comment": f"Computed from eolis_gridded_{elevation_change_var_name}. " + oggm_ds[f"eolis_gridded_{elevation_change_var_name}"].attrs["comment"], "ancillary_var": elevation_change_sigma_var_name }, elevation_change_sigma_var_name: { "data": error_timeseries, "source": "Values represent propagated 1-sigma uncertainty" " of the glacier-wide mean, estimated from pixel-level" f" uncertainties (eolis_gridded_{elevation_change_sigma_var_name})" " using a spatial correlation model.", "comment": "Uncertainty propagated using a static 20 km " "spatial correlation length.", "ancillary_var": elevation_change_var_name }, } new_var_name_format = "eolis_{}_timeseries" for col, timeseries_dict in timeseries_data.items(): var_name = new_var_name_format.format(col) ancillary_var_name = new_var_name_format.format( timeseries_dict["ancillary_var"]) attrs = oggm_ds[f"eolis_gridded_{col}"].attrs.copy() attrs["ancillary_variables"] = ancillary_var_name attrs["comment"] = timeseries_dict["comment"] attrs["source"] += timeseries_dict["source"] eolis_resampled_grids_xarr = xr.DataArray( timeseries_dict["data"], coords={"t": oggm_ds.t}, dims=("t"), name=var_name, attrs=attrs ) oggm_ds[var_name] = eolis_resampled_grids_xarr logger.debug( "1D time series generation and addition to dataset took " f"{perf_counter()-timeseries_gen_start_time:.2f} seconds" )
[docs] def generate_1d_timeseries( self: DatacubeCryotempoEolis, eolis_gridded_data: gpd.GeoDataFrame, elevation_change_var_name: str, elevation_change_sigma_var_name: str, length_scale: int | float = 6e3 ) -> tuple[list[float], list[float]]: """Compute a 1D elevation change time series with propagated uncertainties. For each timestamp in the gridded dataset, the function calculates the mean elevation change across all valid pixels and estimates the uncertainty of the mean by building a covariance matrix that accounts for spatial correlation between points. Parameters ---------- eolis_gridded_data : gpd.GeoDataFrame EOLIS gridded data containing x/y locations, timestamps, and elevation change values. elevation_change_var_name : str Column name holding elevation change values. elevation_change_sigma_var_name : str Column name holding uncertainty values. length_scale : int | float, optional Spatial correlation length scale in eolis_gridded_data geometry crs units used to construct the exponential decay kernel, by default 2e4. Returns ------- tuple[list[float], list[float]] Two lists with the same length as the number of unique timestamps: - Mean elevation change per timestamp. - Propagated uncertainty per timestamp. """ grouped_data = eolis_gridded_data.groupby('timestamp', sort=True) timestamps = [t[0] for t in grouped_data] gridded_product_data_grouped = [t[1] for t in grouped_data] elevation_change_timeseries = [] error_timeseries = [] for i in range(len(timestamps)): gridded_data_this_timestamp = gridded_product_data_grouped[i].loc[ gridded_product_data_grouped[i][ elevation_change_var_name].notna() ] mean = gridded_data_this_timestamp[elevation_change_var_name].mean() uncertainty = gridded_data_this_timestamp[ elevation_change_sigma_var_name].astype(float).to_numpy() coords = gridded_data_this_timestamp[ ['x', 'y']].astype(float).to_numpy() # Build correlation matrix (exponential decay kernel) pairwise_dist = cdist(coords, coords) correlation = np.exp(-pairwise_dist / length_scale) # Covariance matrix cov_matrix = np.outer(uncertainty, uncertainty) * correlation # Uniform weights n_vals = len(gridded_data_this_timestamp) weights = np.ones(n_vals) / n_vals # Compute variance of the weighted mean var_mean = weights @ cov_matrix @ weights std_mean = np.sqrt(var_mean) elevation_change_timeseries.append(mean) error_timeseries.append(std_mean) return elevation_change_timeseries, error_timeseries
[docs] def create_vector_glacier_mask( self: DatacubeCryotempoEolis, oggm_ds: xr.Dataset, target_crs: CRS ) -> gpd.GeoDataFrame: """Vectorise the glacier mask of an OGGM dataset into polygon(s). The raster glacier mask is converted to shapely polygons using rasterio.features.shapes, dissolved into a single geometry, and reprojected to the target CRS. Parameters ---------- oggm_ds : xr.Dataset OGGM dataset containing a glacier_mask DataArray and CRS. target_crs : CRS Target coordinate reference system for the output polygons. Returns ------- gpd.GeoDataFrame GeoDataFrame containing the glacier mask polygon(s) in the specified CRS. """ # Extract the mask mask = oggm_ds.glacier_mask.values.astype("uint8") # Vectorise shapes = features.shapes(mask, mask=mask.astype(bool), transform=oggm_ds.glacier_mask.rio.transform()) gdf = gpd.GeoDataFrame( geometry=[shape(geom) for geom, val in shapes if val == 1], crs=oggm_ds.rio.crs ) gdf = gdf.dissolve() gdf_reproj = gdf.to_crs(target_crs) return gdf_reproj