"""Copyright 2025 DTCG Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
=====
Functionality for retrieving ENVEO surface type classification data.
"""
import warnings
import numpy as np
import pandas as pd
import salem
import xarray as xr
from oggm import utils
[docs]
class DatacubeSurfaceType:
[docs]
def __init__(self):
self.data_base_url = (
"https://cluster.klima.uni-bremen.de/~dtcg/test_files/"
"case_study_regions/austria/sfc_type_obs/merged_data/"
)
self.files_used = [
"20150704_R022.tif",
"20150803_R022.tif",
"20150813_R022.tif",
"20150826_R065.tif",
"20151022_R022.tif",
"20160522_R065.tif",
"20160628_R022.tif",
"20160718_R022.tif",
"20160807_R022.tif",
"20160827_R022.tif",
"20160909_R065.tif",
"20160926_R022.tif",
"20161016_R022.tif",
"20161029_R065.tif",
"20170517_R065.tif",
"20170527_R065.tif",
"20170613_R022.tif",
"20170626_R065.tif",
"20170706_R065.tif",
"20170716_R065.tif",
"20170805_R065.tif",
"20170807_R022.tif",
"20170822_R022.tif",
"20170825_R065.tif",
"20170830_R065.tif",
"20170921_R022.tif",
"20180616_R065.tif",
"20180623_R022.tif",
"20180701_R065.tif",
"20180713_R022.tif",
"20180718_R022.tif",
"20180726_R065.tif",
"20180731_R065.tif",
"20180817_R022.tif",
"20180820_R065.tif",
"20180827_R022.tif",
"20180919_R065.tif",
"20180921_R022.tif",
"20180926_R022.tif",
"20190502_R065.tif",
"20190524_R022.tif",
"20190601_R065.tif",
"20190603_R022.tif",
"20190613_R022.tif",
"20190618_R022.tif",
"20190628_R022.tif",
"20190716_R065.tif",
"20190723_R022.tif",
"20190827_R022.tif",
"20190830_R065.tif",
"20190904_R065.tif",
"20190916_R022.tif",
"20190921_R022.tif",
"20190929_R065.tif",
"20200521_R065.tif",
"20200602_R022.tif",
"20200612_R022.tif",
"20200627_R022.tif",
"20200707_R022.tif",
"20200712_R022.tif",
"20200727_R022.tif",
"20200801_R022.tif",
"20200806_R022.tif",
"20200809_R065.tif",
"20200811_R022.tif",
"20200819_R065.tif",
"20200821_R022.tif",
"20200903_R065.tif",
"20200905_R022.tif",
"20200908_R065.tif",
"20200913_R065.tif",
"20200915_R022.tif",
"20210811_R022.tif",
"20210814_R065.tif",
"20210821_R022.tif",
"20210903_R065.tif",
"20210905_R022.tif",
"20210910_R022.tif",
"20210913_R065.tif",
"20210918_R065.tif",
"20210923_R065.tif",
"20210925_R022.tif",
]
# the above file handling can be improved in the future, e.g. by using:
# import requests
# from bs4 import BeautifulSoup
#
# response = requests.get(data_base_url)
# soup = BeautifulSoup(response.text, "html.parser")
#
# files_used = []
# sfc_type_dates = []
# for link in soup.find_all("a", href=True):
# file = link['href']
# if file.lower().endswith('.tif'):
# date = file.split('_')[0]
# if date in sfc_type_dates:
# # there is already an observation for this date available
# raise ValueError(f'Surface type for {date} already added! '
# 'Check the provided input files, for each '
# 'date only one should be provided!')
# sfc_type_dates.append(date)
# files_used.append(file)
[docs]
def add_data(
self, datacube, gdir, bin_interval=30, snow_cover_thresholds: list | None = None
):
"""
Every datacube must support this method. It should be able to add
data to the provided datacube and returns the final datacube.
Parameters
----------
datacube
gdir
bin_interval
snow_cover_thresholds
"""
# this try except statement is quick and dirty, could be more elegant in
# the future
if not snow_cover_thresholds:
snow_cover_thresholds = [0.25, 0.5, 0.75]
try:
return self.add_sfc_type_observation(
ds=datacube,
gdir=gdir,
bin_interval=bin_interval,
snow_cover_thresholds=snow_cover_thresholds,
)
except RuntimeError:
print(f"No surface type observation available for {gdir.rgi_id}")
# This function is based on oggm.shop.millan22._filter_and_reproj
[docs]
def reproject_single_sfc_type_file(self, gdir, input_file):
"""
Parameters
----------
gdir
input_file
Returns
-------
"""
# Subset to avoid mega files
dsb = salem.GeoTiff(input_file)
x0, x1, y0, y1 = gdir.grid.extent_in_crs(dsb.grid.proj)
with warnings.catch_warnings():
# This can trigger an out-of-bounds warning
warnings.filterwarnings(
"ignore", category=RuntimeWarning, message=".*out of bounds.*"
)
dsb.set_subset(corners=((x0, y0), (x1, y1)), crs=dsb.grid.proj, margin=5)
data_sfc_types = dsb.get_vardata(var_id=1)
data_uncertainty = dsb.get_vardata(var_id=2)
# Reproject now
with warnings.catch_warnings():
# This can trigger an out-of-bounds warning
warnings.filterwarnings(
"ignore", category=RuntimeWarning, message=".*out of bounds.*"
)
r_data_sfc_types = gdir.grid.map_gridded_data(
data_sfc_types, dsb.grid, interp="nearest"
)
r_data_uncertainty = gdir.grid.map_gridded_data(
data_uncertainty, dsb.grid, interp="nearest"
)
return r_data_sfc_types.data, r_data_uncertainty.data
[docs]
def add_sfc_type_observation(
self, ds, gdir, bin_interval=30, snow_cover_thresholds: list | None = None
):
"""
Parameters
----------
ds
gdir
bin_interval
snow_cover_thresholds
Returns
-------
"""
# prepare structure for data
sfc_type_data = np.zeros((len(self.files_used), *ds["glacier_mask"].shape))
sfc_type_uncertainty = np.zeros(
(len(self.files_used), *ds["glacier_mask"].shape)
)
# loop through all files and add one after the other
for i, filename in enumerate(self.files_used):
# download data
input_file = utils.file_downloader(self.data_base_url + filename)
r_data, r_uncertainty = self.reproject_single_sfc_type_file(
gdir, input_file
)
sfc_type_data[i, :] = r_data
sfc_type_uncertainty[i, :] = r_uncertainty
# use nan for missing data
missing_data_val = np.nan
sfc_type_data = np.where(sfc_type_data == 255, missing_data_val, sfc_type_data)
sfc_type_uncertainty = np.where(
sfc_type_uncertainty == 255, missing_data_val, sfc_type_uncertainty
)
# add to gridded data with some attributes
sfc_type_dates = [file.split("_")[0] for file in self.files_used]
ds.coords["t_sfc_type"] = pd.to_datetime(sfc_type_dates, format="%Y%m%d")
ds["t_sfc_type"].attrs = {"long_name": "Timestamp of surface type observations"}
ds["sfc_type_data"] = (("t_sfc_type", "y", "x"), sfc_type_data)
ds["sfc_type_data"].attrs = {
"long_name": "Glacier surface classification",
"data_source": "ENVEO",
"units": "none",
"code_description": "Extract dict with eval(ds.code).",
"code": str(
{
0: "unclassified",
1: "snow",
2: "firn / old snow / bright ice",
3: "clean ice",
4: "debris",
5: "cloud",
"nan": "no data",
}
),
}
ds["sfc_type_uncertainty"] = (("t_sfc_type", "y", "x"), sfc_type_uncertainty)
ds["sfc_type_uncertainty"].attrs = {
"long_name": "Glacier surface classification uncertainty",
"data_source": "ENVEO",
"units": "none",
"code_description": "Extract dict with eval(ds.code).",
"code": str(
{
1: "low uncertainty for illuminated pixel",
2: "medium uncertainty for illuminated pixel",
3: "high uncertainty for illuminated pixel",
5: "cloud",
11: "low uncertainty for shaded pixel",
12: "medium uncertainty for shaded pixel",
13: "high uncertainty for shaded pixel",
"nan": "no data",
}
),
}
ds = ds.sortby("t_sfc_type")
if not snow_cover_thresholds:
snow_cover_thresholds = [0.25, 0.5, 0.75]
ds = self.add_snowline_from_observation(
gdir,
ds,
bin_interval=bin_interval,
snow_cover_thresholds=snow_cover_thresholds,
)
# convert time coordinate
epoch = np.datetime64("1970-01-01T00:00:00")
time = ds["t_sfc_type"]
seconds = (time - epoch).astype("timedelta64[s]").astype("int64")
seconds.attrs = {
"long_name": time.attrs.get("long_name", ""),
"standard_name": "time",
"units": "seconds since 1970-01-01 00:00:00",
}
ds = ds.assign_coords(t_sfc_type=seconds)
return ds
[docs]
def add_snowline_from_observation(
self,
gdir,
ds,
bin_interval: int = 30,
snow_cover_thresholds: list | None = None,
):
"""
Parameters
----------
gdir
ds
bin_interval
snow_cover_thresholds
Returns
-------
"""
# currently we only support 3 thresholds, but can be adapted below
if not snow_cover_thresholds:
snow_cover_thresholds = [0.25, 0.5, 0.75]
elif len(snow_cover_thresholds) != 3:
raise NotImplementedError("Only a list of three thresholds is supported.")
ds_count = get_categories_per_elevation_band(ds=ds, bin_interval=bin_interval)
ds_count = exclude_empty_elevation_bins(ds=ds_count)
ds_count = exclude_dates_with_too_much_cloud_cover(ds=ds_count)
snowline_obs = []
snowline_obs_lower = []
snowline_obs_upper = []
for date in ds.t_sfc_type:
if date.values not in ds_count.t_sfc_type.values:
snowline_obs_lower.append(np.nan) # 25% snow cover
snowline_obs.append(np.nan) # 50% snow cover
snowline_obs_upper.append(np.nan)
continue
ds_use = ds_count.sel(t_sfc_type=date)
tmp_idx, tmp_obs = get_snowline(ds_use, thresholds=snow_cover_thresholds)
snowline_obs_lower.append(tmp_obs[0]) # 25% snow cover
snowline_obs.append(tmp_obs[1]) # 50% snow cover
snowline_obs_upper.append(tmp_obs[2]) # 75% snow cover
ds.coords["snowcover_frac"] = snow_cover_thresholds
ds["snowcover_frac"].attrs = {
"long_name": "Minimum snowcover fraction per elevation bin derived "
"from surface type observations"
}
ds["sfc_type_snowline"] = (
("snowcover_frac", "t_sfc_type"),
np.array([snowline_obs_lower, snowline_obs, snowline_obs_upper]),
)
# derive values for inf (fully or no snowcover)
elev_band_edges = get_elev_band_edges(ds, bin_interval=bin_interval)
ds["sfc_type_snowline"].attrs = {
"long_name": "Snowline altitude derived from glacier surface "
"classification",
"data_source": "ENVEO",
"units": "m",
"inf_values": str(
{
np.inf: float(np.max(elev_band_edges)),
-np.inf: float(np.min(elev_band_edges)),
}
),
}
return ds
[docs]
def get_elev_band_edges(ds, topo_data="topo_smoothed", bin_interval=30):
"""Get elevation band heights for selected interval in m.
Parameters
----------
ds
topo_data
bin_interval
Returns
-------
"""
glacier_topo_flat = ds[topo_data].data[ds.glacier_mask.astype(bool)]
max_glacier_elevation = np.max(glacier_topo_flat)
min_glacier_elevation = np.min(glacier_topo_flat)
# + and - bin_interval just to be sure everything is included
max_band_height = (
np.ceil(max_glacier_elevation / bin_interval) * bin_interval + bin_interval
)
min_band_height = (
np.floor(min_glacier_elevation / bin_interval) * bin_interval - bin_interval
)
return np.arange(max_band_height, min_band_height - 1, -bin_interval)
[docs]
def get_categories_per_elevation_band(
ds,
topo_data="topo_smoothed",
category_data="sfc_type_data",
category_uncertainty="sfc_type_uncertainty",
category_time_var="t_sfc_type",
nodata_value=np.nan,
bin_interval=30,
):
"""
Parameters
----------
ds
topo_data
category_data
category_uncertainty
category_time_var
nodata_value
bin_interval
Returns
-------
"""
# open needed data for aggregation
elevations = ds[topo_data].data[ds.glacier_mask.astype(bool)]
categories = ds[category_data].data[:, ds.glacier_mask.astype(bool)]
categories_attrs = ds[category_data].attrs
uncertainties = ds[category_uncertainty].data[:, ds.glacier_mask.astype(bool)]
uncertainties_attrs = ds[category_uncertainty].attrs
time_dim = categories.shape[0]
time_values = ds[category_time_var]
# Mask out nodata
if np.isnan(nodata_value):
valid_mask = ~np.isnan(categories)
else:
valid_mask = ~np.isin(categories, nodata_value)
# get elevation bands, need to be ascending for np.digitize
elev_band_edges = get_elev_band_edges(
ds, topo_data=topo_data, bin_interval=bin_interval
)
n_bins = elev_band_edges.size - 1
# assign elevation band indexes, -1 to start from 0
bin_ids = np.digitize(elevations, bins=elev_band_edges) - 1
def count_values_per_elevation_bin(data):
# Get unique valid categories (excluding nodata and -1)
unique_values = np.unique(data)
unique_values = np.sort(unique_values)
if np.isnan(nodata_value):
unique_values = unique_values[~np.isnan(unique_values)]
else:
unique_values = np.setdiff1d(unique_values, [nodata_value])
# we assign values from 0 to len(unique_cats) to be able to use hist 2d later
values_to_index = {value: i for i, value in enumerate(unique_values)}
# Create result array: shape should be (time, elevation_bin, valid categories)
result = np.zeros((time_dim, n_bins, len(unique_values)), dtype=int)
# Efficient per-time counting
for t in range(time_dim):
valid = valid_mask[t]
binned = bin_ids[valid]
values = data[t][valid]
# Map category values to indices
value_idx = np.array([values_to_index[c] for c in values])
hist2d = np.zeros((n_bins, len(unique_values)), dtype=int)
# Use np.add.at for fast 2D histogram
np.add.at(hist2d, (binned, value_idx), 1)
result[t] = hist2d
return result, unique_values
category_counts, unique_cats = count_values_per_elevation_bin(categories)
uncertainty_counts, unique_uncert = count_values_per_elevation_bin(uncertainties)
# create dataset of result
ds = xr.Dataset(
data_vars={
"category_counts": (
("t_sfc_type", "elevation_bin", "category"),
category_counts,
{
"long_name": "Count of glacier surface classification per "
"elevation band"
},
),
"uncertainty_counts": (
("t_sfc_type", "elevation_bin", "uncertainty_flag"),
uncertainty_counts,
{
"long_name": "Count of glacier surface classification "
"uncertainty per elevation band"
},
),
},
coords={
"t_sfc_type": time_values,
"elevation_bin": (
"elevation_bin",
range(n_bins),
{"long_name": "Index of elevation bin"},
),
"lower_elevation": (
"elevation_bin",
elev_band_edges[1:],
{"long_name": "Lower boundary of elevation bin"},
),
"upper_elevation": (
"elevation_bin",
elev_band_edges[:-1],
{"long_name": "Upper boundary of elevation bin"},
),
"category": ("category", unique_cats, categories_attrs),
"uncertainty_flag": (
"uncertainty_flag",
unique_uncert,
uncertainties_attrs,
),
},
)
return ds
[docs]
def exclude_empty_elevation_bins(ds):
"""
Parameters
----------
ds
Returns
-------
"""
# get the total number of observations per elevation band
number_grid_points_elev_bin = ds.category_counts.sum(dim="category")
# check that this number is the same for each timestamp
assert np.all(
number_grid_points_elev_bin == number_grid_points_elev_bin.isel(t_sfc_type=0)
)
# if the number of grid points is 0 there is no valid data
return ds.isel(
elevation_bin=np.where(number_grid_points_elev_bin.isel(t_sfc_type=0) != 0)[0]
)
[docs]
def exclude_dates_with_too_much_cloud_cover(ds, cloud_cover=0.5):
"""
Parameters
----------
ds
cloud_cover
Returns
-------
"""
relative_cloud_cover = ds.category_counts.sum(dim="elevation_bin").sel(
category=5
) / ds.category_counts.sum(dim=["elevation_bin", "category"])
return ds.isel(t_sfc_type=np.where(relative_cloud_cover < cloud_cover)[0])
[docs]
def get_snowline(ds, thresholds: list | None = None):
"""
Parameters
----------
ds
thresholds
Returns
-------
"""
# the provided ds should already be cleaned for scenes
da_counts = ds.category_counts
all_grid_points_elev_bin = da_counts.sum(dim="category")
cloud_grid_points = da_counts.sel(category=5)
# exclude completely cloud covered elevation bins
da_cloudfree = da_counts.isel(
elevation_bin=np.where(all_grid_points_elev_bin != cloud_grid_points)[0]
)
# calculate relative contributions, excluding cloud grid points
# we assume all surface types are equally covered by clouds
da_rel_snow = da_cloudfree.sel(category=1) / da_cloudfree.sel(
category=[1, 2, 3, 4]
).sum(dim="category")
# use different thresholds of the snow fraction for integrating uncertainty
def get_lowest_elev_bin_exceeding_threshold(bin_threshold):
exceeding_threshold = da_rel_snow > bin_threshold
# check if lowest bin is snow covered
if exceeding_threshold.isel(elevation_bin=-1):
return da_counts.elevation_bin.values[-1] + 1, -np.inf
# check if no bin exceeds threshold
elif not np.any(exceeding_threshold):
return -1, np.inf
else:
lowest_bin = da_rel_snow.sel(elevation_bin=exceeding_threshold).isel(
elevation_bin=-1
)
return (
lowest_bin.elevation_bin.values.item(),
lowest_bin.lower_elevation.values.item(),
)
lowest_elev_bin_idx = []
lowest_elev_bin_values = []
if not thresholds:
thresholds = [0.25, 0.5, 0.75]
for threshold in thresholds:
tmp_idx, tmp_value = get_lowest_elev_bin_exceeding_threshold(threshold)
lowest_elev_bin_idx.append(tmp_idx)
lowest_elev_bin_values.append(tmp_value)
return lowest_elev_bin_idx, lowest_elev_bin_values