"""Copyright 2025 DTCG Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
=====
Functionality for retrieving and resampling CryoTEMPO-EOLIS derived elevation
change data.
"""
from __future__ import annotations
import logging
import os
from time import perf_counter
import geopandas as gpd
import numpy as np
import pandas as pd
import pyproj
import shapely
import xarray as xr
from affine import Affine
from pandas import Timedelta
from pyproj import CRS, Proj
from rasterio import features
from salem.gis import Grid
from scipy.ndimage import gaussian_filter
from scipy.spatial.distance import cdist
from shapely.geometry import shape
from specklia import Specklia
os.environ["PROJ_LIB"] = pyproj.datadir.get_data_dir()
logger = logging.getLogger(__name__)
[docs]
class DatacubeCryotempoEolis:
"""Functionality for adding EOLIS data to a OGGM datacube.
Attributes
----------
SPECKLIA_DATASET_NAME_EOLIS_ELEVATION_CHANGE : str
Name of the dataset in Specklia to add to the OGGM datacube.
EOLIS_STATIC_KEYS : list[str]
EOLIS metadata keys that are static across product files.
EOLIS_PRODUCT_KEYS : list[str]
EOLIS metadata keys that are unique for each product.
"""
[docs]
def __init__(self: DatacubeCryotempoEolis):
self.SPECKLIA_DATASET_NAME_EOLIS_ELEVATION_CHANGE = (
"CryoTEMPO-EOLIS Interpolated Elevation Change Maps"
)
self.EOLIS_STATIC_KEYS = [
"Conventions",
"DOI",
"Metadata_Conventions",
"baseline",
"cdm_data_type",
"comment",
"contact",
"creator_email",
"creator_url",
"institution",
"keywords",
"keywords_vocabulary",
"platform",
"processing_level",
"project",
"references",
"region",
"source",
"summary",
"title",
"version",
]
self.EOLIS_PRODUCT_KEYS = [
"science_pds_path",
"Earth_Explorer_Header",
"science_pds_download_link",
"date_created",
"date_modified",
"geospatial_projection",
"geospatial_resolution",
"geospatial_resolution_units",
"geospatial_x_max",
"geospatial_x_min",
"geospatial_x_units",
"geospatial_y_max",
"geospatial_y_min",
"geospatial_y_units",
"time_coverage_duration",
"time_coverage_end",
"time_coverage_start",
"version",
]
[docs]
def convert_gridded_dataframe_to_array(
self: DatacubeCryotempoEolis,
gridded_df: pd.DataFrame,
value_column_names: list[str],
x_coordinate_column: str,
y_coordinate_column: str,
spatial_resolution: float,
xy_projection: Proj,
y_affine_negative: bool = True,
t_coordinate_column: str = None,
) -> tuple[dict[str, np.ndarray], Grid, np.ndarray | None]:
"""Resolve arrays from sparse gridded data stored in a dataframe.
For each column name specified, an array (either 2d, or 3d if a time
coordinate is supplied) is created using the
x and y extent in the gridded dataframe and the provided resolution,
and is populated with the sparse gridded data from the dataframe.
Parameters
----------
gridded_df : pd.DataFrame
DataFrame containing sparse gridded data, including coordinate
columns and value columns.
value_column_names : list[str]
List of column names whose values are to be gridded.
x_coordinate_column : str
Name of the column containing x coordinates.
y_coordinate_column : str
Name of the column containing y coordinates.
spatial_resolution : float
Resolution to use when constructing the regular grid.
xy_projection : Proj
PyProj projection object representing the spatial reference system.
y_affine_negative : bool, optional
Whether to invert the y-axis when computing affine transformation,
by default True.
t_coordinate_column : str, optional
Optional name of time coordinate column to create a 3D array,
by default None.
Returns
-------
tuple[dict[str, np.ndarray], Grid, np.ndarray or None]
- Dictionary of gridded arrays keyed by column name
- Salem Grid object describing the spatial extent
- Optional array of sorted time coordinates (if time is used)
"""
# Validate input
for col in [x_coordinate_column,
y_coordinate_column] + value_column_names:
if col not in gridded_df.columns:
raise ValueError(
f"Missing required column '{col}' in gridded_df"
)
unique_x_points = np.sort(gridded_df[x_coordinate_column].unique())
unique_y_points = np.sort(gridded_df[y_coordinate_column].unique())
# prepare coordinate axes
desired_x_axis = np.arange(
unique_x_points[0],
unique_x_points[-1] + spatial_resolution,
spatial_resolution,
)
desired_y_axis = np.arange(
unique_y_points[0],
unique_y_points[-1] + spatial_resolution,
spatial_resolution,
)
# we need to offset both these axes by one in order to use
# np.searchsorted() correctly below.
y_idx = np.searchsorted(desired_y_axis, gridded_df[y_coordinate_column])
x_idx = np.searchsorted(desired_x_axis, gridded_df[x_coordinate_column])
# time axis handling
if t_coordinate_column is not None:
desired_t_axis = np.sort(gridded_df[t_coordinate_column].unique())
t_idx = np.searchsorted(desired_t_axis,
gridded_df[t_coordinate_column])
else:
desired_t_axis = None
# Affine transformation
if y_affine_negative:
desired_y_axis = desired_y_axis[::-1]
transform = Affine.translation(
desired_x_axis.min() - spatial_resolution / 2,
desired_y_axis.max() + spatial_resolution / 2,
) * Affine.scale(spatial_resolution, -spatial_resolution)
else:
transform = Affine.translation(
desired_x_axis.min() - spatial_resolution / 2,
desired_y_axis.min() - spatial_resolution / 2,
) * Affine.scale(spatial_resolution, spatial_resolution)
# create the output raster, initially filling it with NaN
output_arrays = {}
for column_name in value_column_names:
if desired_t_axis is not None:
grid_arr = np.full(
(len(desired_t_axis),
len(desired_y_axis),
len(desired_x_axis)),
np.nan,
)
grid_arr[t_idx, y_idx, x_idx] = gridded_df[column_name]
else:
grid_arr = np.full(
(len(desired_y_axis), len(desired_x_axis)), np.nan
)
grid_arr[y_idx, x_idx] = gridded_df[column_name]
if y_affine_negative:
grid_arr = np.flip(grid_arr, axis=-2)
output_arrays[column_name] = grid_arr
grid = Grid(
proj=xy_projection,
nxny=(len(desired_x_axis), len(desired_y_axis)),
dxdy=(transform.a, transform.e),
x0y0=(transform.c, transform.f),
pixel_ref="center",
)
return output_arrays, grid, desired_t_axis
[docs]
def create_query_polygon(
self: DatacubeCryotempoEolis, oggm_ds: xr.Dataset
) -> shapely.Polygon:
"""Create a WGS84-aligned polygon bounding box from the spatial extent
of an OGGM dataset.
Parameters
----------
oggm_ds : xr.Dataset
OGGM dataset a geospatial CRS to extract extent from.
Returns
-------
shapely.Polygon
Polygon in EPSG:4326 (lat/lon) describing the outer bounds of the
OGGM dataset.
"""
# Create query polygon for Specklia, matching the extent of the oggm
# data cube and then reprojected to WGS84
# should consider buffering this a bit to avoid edge features after
# resampling
if not oggm_ds.rio.crs:
oggm_ds.rio.write_crs(oggm_ds.pyproj_srs, inplace=True)
# Reproject the dataset to EPSG:4326
ds_ll = oggm_ds.rio.reproject("EPSG:4326")
# build query polygon from bounds
bounds = ds_ll.rio.bounds()
return shapely.box(*bounds)
[docs]
def retrieve_data_from_specklia(
self: DatacubeCryotempoEolis,
query_polygon: shapely.Polygon,
specklia_data_set_name: str,
specklia_api_key: str,
min_timestamp: int = None,
max_timestamp: int = None,
) -> tuple[gpd.GeoDataFrame, list[dict], pd.Series]:
"""Query and retrieve data from the Specklia API for a given spatial and
temporal extent.
Parameters
----------
query_polygon : shapely.Polygon
WGS84 polygon representing the spatial area to query.
specklia_data_set_name : str
Name of the dataset to query from Specklia.
specklia_api_key : str
API key used to authenticate with the Specklia service.
min_timestamp : int, optional
Optional minimum timestamp to filter results, by default None.
If not set then the minimum timestamp of the dataset in specklia
will be used.
max_timestamp : int, optional
Optional maximum timestamp to filter results, by default None.
If not set then the maximum timestamp of the dataset in specklia
will be used.
Returns
-------
tuple[gpd.GeoDataFrame, list[dict], pd.Series]
- GeoDataFrame of geospatial data queried from Specklia
- List of metadata dicts for all data in the retrieved geodataframe
- Dataset-level metadata for the Specklia dataset queried
"""
client = Specklia(specklia_api_key)
available_datasets = client.list_datasets()
matching_dataset = available_datasets[
available_datasets["dataset_name"] == specklia_data_set_name
]
if matching_dataset.empty:
raise ValueError(
f"No dataset named '{specklia_data_set_name}' "
"found in Specklia."
)
specklia_dataset_info = matching_dataset.iloc[0]
if min_timestamp is None:
min_timestamp = specklia_dataset_info["min_timestamp"] - Timedelta(
seconds=1
)
if max_timestamp is None:
max_timestamp = specklia_dataset_info["max_timestamp"] + Timedelta(
seconds=1
)
# query the data from Specklia
query_start_time = perf_counter()
gdf, source_info = client.query_dataset(
dataset_id=specklia_dataset_info["dataset_id"],
epsg4326_polygon=query_polygon,
min_datetime=min_timestamp,
max_datetime=max_timestamp,
)
logger.debug(
f"Specklia query took {perf_counter()-query_start_time:.2f} seconds"
)
return gdf, source_info, specklia_dataset_info
[docs]
def gaussian_filter_fill(
self: DatacubeCryotempoEolis, arr: np.ndarray,
sigma: float | tuple[float] = 3.0, **kwargs) -> np.ndarray:
"""Fill NaN regions in an array using Gaussian filter.
The data and a validity mask are filtered separately and combined so
that only NaNs are replaced, while original values remain unchanged.
Parameters
----------
arr : np.ndarray
Input array with NaNs.
sigma : float | tuple[float]
Standard deviation(s) for the Gaussian kernel, by default 3.0
**kwargs : dict
Additional arguments passed to scipy.ndimage.gaussian_filter.
Returns
-------
np.ndarray
Copy of arr with NaNs filled by Gaussian-smoothed neighbors.
"""
# Mask: 1 where valid, 0 where NaN
mask = np.isfinite(arr)
# Replace NaNs with 0 for convolution
filled = np.nan_to_num(arr, nan=0.0)
# Apply filter to data and mask
filtered = gaussian_filter(filled, sigma=sigma, **kwargs)
norm = gaussian_filter(mask.astype(float), sigma=sigma, **kwargs)
# Avoid division by zero
with np.errstate(invalid="ignore", divide="ignore"):
result = filtered / norm
result[norm == 0] = np.nan
arr_filled = arr.copy()
arr_filled[~mask] = result[~mask]
return arr_filled
[docs]
def retrieve_prepare_eolis_gridded_data(
self: DatacubeCryotempoEolis, oggm_ds: xr.Dataset, grid: Grid
) -> xr.Dataset:
"""Retrieve EOLIS gridded elevation change data and resample it to the
given OGGM grid.
Parameters
----------
oggm_ds : xr.Dataset
OGGM xarray dataset to which EOLIS data will be added.
grid : Grid
Grid object representing the OGGM grid projection and extent.
Returns
-------
xr.Dataset
Modified OGGM dataset including resampled EOLIS elevation change and
uncertainty arrays.
"""
# build query polygon from OGGM data cube
specklia_query_polygon = self.create_query_polygon(oggm_ds)
# query EOLIS dataset from specklia over full time period
eolis_gridded_data, eolis_gridded_sources, eolis_gridded_dataset = (
self.retrieve_data_from_specklia(
specklia_query_polygon,
self.SPECKLIA_DATASET_NAME_EOLIS_ELEVATION_CHANGE,
os.getenv("SPECKLIA_API_KEY"),
)
)
elevation_change_var_name = "elevation_change"
elevation_change_sigma_var_name = "elevation_change_sigma"
# convert spare dataframe to data cube
eolis_arrays, eolis_grid, time_coordinates = (
self.convert_gridded_dataframe_to_array(
eolis_gridded_data,
[elevation_change_var_name, elevation_change_sigma_var_name],
"x",
"y",
np.nanmin(np.abs(np.diff(eolis_gridded_data.x.unique()))),
eolis_gridded_sources[0]["source_information"]["xy_cols_proj4"],
y_affine_negative=True,
t_coordinate_column="timestamp",
)
)
resampling_start_time = perf_counter()
# add EOLIS elevation and uncertainty to OGGM data cube
eolis_metadata = self.prepare_eolis_metadata(eolis_gridded_sources)
for col in [elevation_change_var_name, elevation_change_sigma_var_name]:
data_name = f"eolis_gridded_{col}"
# buffer eolis grid using gaussian filter to avoid data loss at
# the margins
buffered_eolis_array = self.gaussian_filter_fill(eolis_arrays[col])
eolis_resampled_grid = grid.map_gridded_data(
buffered_eolis_array, eolis_grid, interp="linear"
)
column_info = [
d for d in eolis_gridded_dataset["columns"] if d["name"] == col
][0]
eolis_resampled_grids_xarr = xr.DataArray(
eolis_resampled_grid,
coords={"t": time_coordinates, "y": oggm_ds.y, "x": oggm_ds.x},
dims=("t", "y", "x"),
name=data_name,
attrs={
"units": column_info["unit"],
} | eolis_metadata[col],
)
# mask to glacier
if "glacier_mask" in oggm_ds.variables:
eolis_resampled_grids_xarr = eolis_resampled_grids_xarr.where(
oggm_ds.glacier_mask != 0)
oggm_ds[data_name] = eolis_resampled_grids_xarr
logger.debug(
"Resampling of EOLIS and creation of dataset took "
f"{perf_counter()-resampling_start_time:.2f} seconds"
)
self.augment_dataset_with_1d_timeseries(
eolis_gridded_data,
oggm_ds,
elevation_change_var_name,
elevation_change_sigma_var_name
)
return oggm_ds
[docs]
def augment_dataset_with_1d_timeseries(
self: DatacubeCryotempoEolis,
eolis_gridded_data: gpd.GeoDataFrame,
oggm_ds: xr.Dataset,
elevation_change_var_name: str,
elevation_change_sigma_var_name: str
) -> None:
"""Generate and attach 1D time series of elevation change and
uncertainty to an OGGM dataset.
This method masks gridded EOLIS data to the glacier extent,
computes mean elevation change and propagated uncertainties per
timestamp, and appends them to the input OGGM dataset as new
time series variables.
Parameters
----------
eolis_gridded_data : geopandas.GeoDataFrame
EOLIS gridded data containing x/y locations, timestamps,
and elevation change values.
oggm_ds : xr.Dataset
OGGM dataset to which new time series variables will be added.
elevation_change_var_name : str
Name of the column holding elevation change values in the input
data.
elevation_change_sigma_var_name : str
Name of the column holding uncertainty values in the input
data.
Returns
-------
None
The input dataset is modified in place with new 1D time series
variables.
"""
logger.debug("Generating 1D time series.")
timeseries_gen_start_time = perf_counter()
# mask dataset to glacier extent
vector_glacier_mask = self.create_vector_glacier_mask(
oggm_ds, eolis_gridded_data.crs
)
eolis_gridded_data_masked = eolis_gridded_data[
eolis_gridded_data.intersects(
vector_glacier_mask.geometry.union_all()
)
]
# generate timeseries with uncertainties
elevation_change_timeseries, error_timeseries = \
self.generate_1d_timeseries(
eolis_gridded_data_masked,
elevation_change_var_name,
elevation_change_sigma_var_name
)
# add timeseries to datacube
timeseries_data = {
elevation_change_var_name: {
"data": elevation_change_timeseries,
"source": "Values represent glacier-wide mean elevation"
" change, computed as the average of all valid grid cells"
f" (eolis_gridded_{elevation_change_var_name}) within the "
"glacier mask.",
"comment": f"Computed from eolis_gridded_{elevation_change_var_name}. "
+ oggm_ds[f"eolis_gridded_{elevation_change_var_name}"].attrs["comment"],
"ancillary_var": elevation_change_sigma_var_name
},
elevation_change_sigma_var_name: {
"data": error_timeseries,
"source": "Values represent propagated 1-sigma uncertainty"
" of the glacier-wide mean, estimated from pixel-level"
f" uncertainties (eolis_gridded_{elevation_change_sigma_var_name})"
" using a spatial correlation model.",
"comment": "Uncertainty propagated using a static 20 km "
"spatial correlation length.",
"ancillary_var": elevation_change_var_name
},
}
new_var_name_format = "eolis_{}_timeseries"
for col, timeseries_dict in timeseries_data.items():
var_name = new_var_name_format.format(col)
ancillary_var_name = new_var_name_format.format(
timeseries_dict["ancillary_var"])
attrs = oggm_ds[f"eolis_gridded_{col}"].attrs.copy()
attrs["ancillary_variables"] = ancillary_var_name
attrs["comment"] = timeseries_dict["comment"]
attrs["source"] += timeseries_dict["source"]
eolis_resampled_grids_xarr = xr.DataArray(
timeseries_dict["data"],
coords={"t": oggm_ds.t},
dims=("t"),
name=var_name,
attrs=attrs
)
oggm_ds[var_name] = eolis_resampled_grids_xarr
logger.debug(
"1D time series generation and addition to dataset took "
f"{perf_counter()-timeseries_gen_start_time:.2f} seconds"
)
[docs]
def generate_1d_timeseries(
self: DatacubeCryotempoEolis,
eolis_gridded_data: gpd.GeoDataFrame,
elevation_change_var_name: str,
elevation_change_sigma_var_name: str,
length_scale: int | float = 6e3
) -> tuple[list[float], list[float]]:
"""Compute a 1D elevation change time series with propagated
uncertainties.
For each timestamp in the gridded dataset, the function calculates
the mean elevation change across all valid pixels and estimates the
uncertainty of the mean by building a covariance matrix that accounts
for spatial correlation between points.
Parameters
----------
eolis_gridded_data : gpd.GeoDataFrame
EOLIS gridded data containing x/y locations, timestamps,
and elevation change values.
elevation_change_var_name : str
Column name holding elevation change values.
elevation_change_sigma_var_name : str
Column name holding uncertainty values.
length_scale : int | float, optional
Spatial correlation length scale in eolis_gridded_data
geometry crs units used to construct the exponential
decay kernel, by default 2e4.
Returns
-------
tuple[list[float], list[float]]
Two lists with the same length as the number of unique timestamps:
- Mean elevation change per timestamp.
- Propagated uncertainty per timestamp.
"""
grouped_data = eolis_gridded_data.groupby('timestamp', sort=True)
timestamps = [t[0] for t in grouped_data]
gridded_product_data_grouped = [t[1] for t in grouped_data]
elevation_change_timeseries = []
error_timeseries = []
for i in range(len(timestamps)):
gridded_data_this_timestamp = gridded_product_data_grouped[i].loc[
gridded_product_data_grouped[i][
elevation_change_var_name].notna()
]
mean = gridded_data_this_timestamp[elevation_change_var_name].mean()
uncertainty = gridded_data_this_timestamp[
elevation_change_sigma_var_name].astype(float).to_numpy()
coords = gridded_data_this_timestamp[
['x', 'y']].astype(float).to_numpy()
# Build correlation matrix (exponential decay kernel)
pairwise_dist = cdist(coords, coords)
correlation = np.exp(-pairwise_dist / length_scale)
# Covariance matrix
cov_matrix = np.outer(uncertainty, uncertainty) * correlation
# Uniform weights
n_vals = len(gridded_data_this_timestamp)
weights = np.ones(n_vals) / n_vals
# Compute variance of the weighted mean
var_mean = weights @ cov_matrix @ weights
std_mean = np.sqrt(var_mean)
elevation_change_timeseries.append(mean)
error_timeseries.append(std_mean)
return elevation_change_timeseries, error_timeseries
[docs]
def create_vector_glacier_mask(
self: DatacubeCryotempoEolis,
oggm_ds: xr.Dataset,
target_crs: CRS
) -> gpd.GeoDataFrame:
"""Vectorise the glacier mask of an OGGM dataset into polygon(s).
The raster glacier mask is converted to shapely polygons using
rasterio.features.shapes, dissolved into a single geometry, and
reprojected to the target CRS.
Parameters
----------
oggm_ds : xr.Dataset
OGGM dataset containing a glacier_mask DataArray and CRS.
target_crs : CRS
Target coordinate reference system for the output polygons.
Returns
-------
gpd.GeoDataFrame
GeoDataFrame containing the glacier mask polygon(s) in the
specified CRS.
"""
# Extract the mask
mask = oggm_ds.glacier_mask.values.astype("uint8")
# Vectorise
shapes = features.shapes(mask,
mask=mask.astype(bool),
transform=oggm_ds.glacier_mask.rio.transform())
gdf = gpd.GeoDataFrame(
geometry=[shape(geom) for geom, val in shapes if val == 1],
crs=oggm_ds.rio.crs
)
gdf = gdf.dissolve()
gdf_reproj = gdf.to_crs(target_crs)
return gdf_reproj