"""Copyright 2025-2026 DTCG Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
=====
Bindings between DTCG and OGGM.
Executes an OGGM-API request.
"""
import csv
import json
from pathlib import Path
import geopandas as gpd
import numpy as np
import pandas as pd
import shapely.geometry as shpg
import xarray as xr
from oggm import GlacierDirectory, cfg, tasks, utils, workflow
from oggm.shop import its_live, rgitopo, w5e5
import dtcg.datacube.cryotempo_eolis as cryotempo_eolis
import dtcg.integration.calibration
from dtcg.datacube.geozarr import GeoZarrHandler
# TODO: Link to DTCG instead.
# DEFAULT_BASE_URL = "https://cluster.klima.uni-bremen.de/~oggm/demo_gdirs"
# SHAPEFILE_PATH = (
# "https://cluster.klima.uni-bremen.de/~dtcg/test_files/case_study_regions/austria"
# )
[docs]
class BindingsOggmModel:
"""Bindings for interacting with OGGM and web repositories.
Attributes
----------
DEFAULT_BASE_URL : str
Base URL for OGGM data.
SHAPEFILE_PATH : str
Base URL for shapefile data.
WORKING_DIR : str, default None
Temporary working directory.
oggm_params : dict, default None
OGGM configuration parameters.
"""
[docs]
def __init__(
self,
base_url: str = "https://cluster.klima.uni-bremen.de/~oggm/demo_gdirs",
working_dir: str = "",
oggm_params: dict | None = None,
):
super().__init__()
self.DEFAULT_BASE_URL = base_url
self.WORKING_DIR = utils.gettempdir(working_dir)
if oggm_params is None:
self.oggm_params = {}
else:
self.oggm_params = oggm_params
[docs]
def set_oggm_params(self, **new_params: dict) -> None:
"""Define OGGM configuration parameters.
This will overwrite existing values that have the same key.
To configure OGGM, use ``set_oggm_kwargs()``.
Parameters
----------
**new_params
Parameters passed to OGGM's configuration.
"""
self.oggm_params.update(new_params)
[docs]
def set_oggm_kwargs(self) -> None:
"""Set OGGM configuration.
.. note:: This may eventually be moved to ``api.internal._parse_oggm``.
Parameters
----------
oggm_params : dict
Key/value pairs corresponding to valid OGGM configuration
parameters.
"""
valid_keys = cfg.PARAMS.keys()
for key, value in self.oggm_params.items():
if key in valid_keys:
cfg.PARAMS[key] = value
[docs]
def init_oggm(self, dirname: str, **kwargs) -> None:
"""Initialise OGGM run parameters.
TODO: Add kwargs for cfg.PATH.
Parameters
----------
dirname : str
Name of temporary directory
**kwargs
Extra arguments passed to OGGM's configuration.
"""
cfg.initialize(logging_level="CRITICAL")
cfg.PARAMS["border"] = kwargs.get("border", 80)
self.WORKING_DIR = utils.gettempdir(dirname) # already handles empty strings
utils.mkdir(
self.WORKING_DIR, reset=True
) # TODO: this should be an API parameter
cfg.PATHS["working_dir"] = self.WORKING_DIR
self.set_oggm_params(**kwargs)
self.set_oggm_kwargs()
[docs]
def get_matching_region_codes(
self, region_name: str = "", subregion_name: str = ""
) -> set:
"""Get region and subregion codes matching a given subregion name.
Subregion names take priority over region names.
.. todo:: Fuzzy-finding.
Parameters
----------
region_name : str
The full name of a region.
subregion_name : str
The full name of a subregion.
Returns
-------
set[str]
All pairs of region and subregion codes matching the given
subregion or region's name.
Raises
------
KeyError
If the region or subregion name is not found.
ValueError
If no valid region or subregion name is supplied.
"""
matching_region = set()
if subregion_name:
region_db = self.get_rgi_metadata("rgi_subregions_V6.csv", from_web=True)
for row in region_db:
if subregion_name.lower() in row["Full_name"].lower():
region_codes = (row["O1"], row["O2"])
# zfill should be applied only when using utils.parse_rgi_meta
# region_codes = (row["O1"].zfill(2), row["O2"].zfill(2))
matching_region.add(region_codes)
elif region_name:
region_db = self.get_rgi_metadata("rgi_regions.csv", from_web=True)
for row in region_db:
if region_name.lower() in row["Full_name"].lower():
region_codes = (row["RGI_Code"].zfill(2),)
# zfill should be applied only when using utils.parse_rgi_meta
# region_codes = (row["O1"].zfill(2), row["O2"].zfill(2))
matching_region.add(region_codes)
else:
raise ValueError("No valid region or subregion name supplied.")
if not matching_region:
if region_name and subregion_name:
msg = f"{region_name}, {subregion_name}"
else:
msg = f"{region_name}{subregion_name}" # since one is empty
raise KeyError(f"No region found for {msg}.")
return matching_region
[docs]
def get_rgi_region_codes(
self, region_name: str = "", subregion_name: str = ""
) -> set:
"""Get RGI region and subregion codes from a subregion name.
This can be replaced with OGGM backend if available.
Parameters
----------
region_name : str, optional
Name of RGI region.
subregion_name : str, optional
Name of RGI subregion.
Returns
-------
set[str]
RGI region (O1) and subregion (O2) codes.
Raises
------
KeyError
If the subregion name is not available.
TypeError
If subregion name is not a string.
"""
try:
rgi_codes = self.get_matching_region_codes(
region_name=region_name, subregion_name=subregion_name
)
except KeyError as e:
region_names = ", ".join(filter(None, (region_name, subregion_name)))
raise KeyError(f"No regions or subregion matching {region_names}.")
except (TypeError, ValueError, AttributeError) as e:
region_names = ", ".join(filter(None, (region_name, subregion_name)))
raise TypeError(f"{region_names} is not a string.")
return rgi_codes
[docs]
def get_rgi_files_from_subregion(
self, region_name: str = "", subregion_name: str = ""
) -> list:
"""Get RGI shapefile from a subregion name.
Parameters
----------
region_name : str, optional
Name of RGI region.
subregion_name : str, optional
Name of RGI subregion.
Returns
-------
list
RGI shapefiles.
Raises
------
KeyError
If no glaciers are found for the given subregion.
"""
rgi_region_codes = self.get_rgi_region_codes(
region_name=region_name, subregion_name=subregion_name
)
rgi_files = []
for codes in rgi_region_codes:
path = utils.get_rgi_region_file(region=codes[0])
candidate = gpd.read_file(path)
if subregion_name:
rgi_files.append(candidate[candidate["O2Region"] == codes[1]])
else:
rgi_files.append(candidate)
try:
rgi_files = rgi_files[0]
except KeyError as e:
raise KeyError(f"No glaciers found for {subregion_name}.")
return rgi_files
[docs]
def get_glacier_directories(
self, rgi_ids: list, **kwargs
) -> list[GlacierDirectory]:
"""Get OGGM glacier directories.
Parameters
----------
rgi_ids : list
RGI IDs of glaciers.
base_url : str, default empty string
URL to OGGM data. If empty, uses the ``DEFAULT_BASE_URL``
attribute.
prepro_level : int, default 4
File preprocessing level.
prepro_border : int, default 80
Grid border buffer around the glacier.
**kwargs
Extra arguments to ``workflow.init_glacier_directories``.
Returns
-------
list[GlacierDirectory]
Glacier directories for the given RGI IDs.
"""
prepro_base_url = kwargs.get("prepro_base_url", "")
if not prepro_base_url:
kwargs["prepro_base_url"] = self.DEFAULT_BASE_URL
gdirs = workflow.init_glacier_directories(rgi_ids, **kwargs)
return gdirs
[docs]
def get_outline_source_date(self, glacier_data: gpd.GeoDataFrame) -> int:
"""Get the date for an outline's source.
Parameters
----------
glacier_data : gpd.GeoDataFrame
Outline data for a glacier. Must conform to
`RGI60 specifications <https://www.glims.org/RGI/00_rgi60_TechnicalNote.pdf>`__.
Returns
-------
int
The year the outline's source data was published.
"""
outline_date = glacier_data.get("EndDate", "-9999999")
if outline_date == "-9999999":
outline_date = glacier_data.get("BgnDate", "-9999999")
outline_date = int(outline_date[:4])
return outline_date
[docs]
def set_flowlines(self, gdir: GlacierDirectory) -> None:
"""Compute glacier flowlines if missing from glacier directory.
Parameters
----------
gdir : GlacierDirectory
Glacier directory.
"""
if not Path(gdir.get_filepath("inversion_flowlines")).exists():
if not Path(gdir.get_filepath("elevation_band_flowline")).exists():
tasks.elevation_band_flowline(gdir=gdir, preserve_totals=True)
tasks.fixed_dx_elevation_band_flowline(gdir, preserve_totals=True)
[docs]
class BindingsOggmWrangler(BindingsOggmModel):
"""Wrangles input data for OGGM workflows.
Attributes
----------
SHAPEFILE_PATH : str
Region or subregion's glacier outlines.
"""
[docs]
def __init__(
self,
base_url: str = "https://cluster.klima.uni-bremen.de/~oggm/demo_gdirs",
working_dir: str = "",
oggm_params: dict = None,
shapefile_path: str = "https://cluster.klima.uni-bremen.de/~dtcg/test_files/case_study_regions/austria",
):
super().__init__(
base_url=base_url, working_dir=working_dir, oggm_params=oggm_params
)
self.SHAPEFILE_PATH = shapefile_path
[docs]
def get_shapefile(self, path: str) -> gpd.GeoDataFrame:
"""Get shapefile from user path.
Parameters
----------
path : str
Path to shapefile.
"""
shapefile = gpd.read_file(path)
return shapefile
[docs]
def get_shapefile_from_web(self, shapefile_name: str) -> gpd.GeoDataFrame:
"""Placeholder for getting shapefiles from an online repository.
Parameters
----------
shapefile_name : str
Name of file in ``oggm-sample-data`` repository.
"""
path = utils.get_demo_file(shapefile_name)
shapefile = gpd.read_file(path)
return shapefile
[docs]
def get_cached_outline_data(self, cache_path: Path) -> gpd.GeoDataFrame:
"""Get glacier outlines.
This is identical to ``gdir.read_shapefile``, so the CRS should
later be converted to EPSG:4236"""
try:
glacier_outlines = gpd.read_feather(cache_path / "outlines.shp")
except FileNotFoundError:
return None
return glacier_outlines
def get_outline_details(self, polygon):
outline_details = {
"Name": {"value": polygon["Name"], "unit": ""},
"RGI ID": {"value": polygon["RGIId"], "unit": ""},
"GLIMS ID": {"value": polygon["GLIMSId"], "unit": ""},
"Area": {"value": float(polygon["Area"]), "unit": "km²"},
"Max Elevation": {"value": polygon["Zmax"], "unit": "m"},
"Min Elevation": {"value": polygon["Zmin"], "unit": "m"},
"Latitude": {"value": float(polygon["CenLat"]), "unit": "°N"},
"Longitude": {"value": float(polygon["CenLon"]), "unit": "°E"},
"Outline Date": {
"value": self.get_outline_source_date(polygon),
"unit": "",
},
}
return outline_details
[docs]
def set_subregion_constraints(
self,
region: gpd.GeoDataFrame,
subregion: gpd.GeoDataFrame,
subregion_id: str = "",
) -> gpd.GeoDataFrame:
"""Reproject subregional data to match regional data, and
optionally mask by subregion name.
Parameters
----------
region : gpd.GeoDataFrame
Shapefile data for region.
subregion : gpd.GeoDataFrame
Shapefile data for subregion.
subregion_id : str, optional
Subregion id.
Returns
-------
gpd.GeoDataFrame
Reprojected subregion data, optionally masked to a specific
subregion name.
"""
if region.crs != subregion.crs:
subregion = subregion.to_crs(region.crs)
if subregion_id:
subregion = subregion[subregion["id"] == subregion_id]
return subregion
[docs]
def get_glaciers_in_subregion(
self, region: gpd.GeoDataFrame, subregion: gpd.GeoDataFrame
) -> gpd.GeoDataFrame:
"""Get only the glaciers in a given subregion.
Parameters
----------
region : gpd.GeoDataFrame
Contains all glaciers in a region.
subregion : gpd.GeoDataFrame
Subregion shapefile.
Returns
-------
gpd.GeoDataFrame
Only glaciers inside the given subregion.
"""
# frame indices are fixed, so index by location instead
subregion_mask = [
subregion.geometry.contains(shpg.Point(x, y)).iloc[0]
for (x, y) in zip(region.CenLon, region.CenLat)
]
region = region.loc[subregion_mask]
region = region.sort_values("Area", ascending=False)
return region
[docs]
def get_polygon_difference(self, frame: gpd.GeoDataFrame) -> pd.DataFrame:
"""Get the difference of overlapping polygons.
.. note:: GPD's overlay() only acts on GeoDataFrames, but this
function manipulates GeoSeries.
Parameters
----------
frame : gpd.GeoDataFrame
Overlapping polygons.
Returns
-------
gpd.GeoSeries
Polygon geometries adjusted so no areas overlap.
"""
diffs = frame.iloc[1:].geometry.difference(
frame.iloc[:-1].geometry, align=False
)
# use GeoSeries instead of pd.Series for consistent typing & CRS
new_geometry = pd.concat(
[gpd.GeoSeries(frame.geometry.iloc[0], crs=diffs.crs), diffs]
)
return new_geometry
[docs]
def set_polygon_overlap(self, frame: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Set overlapping boundaries.
If two polygons overlap, the overlapping area is removed from the
second polygon.
Parameters
----------
frame : gpd.GeoDataFrame
Contains overlapping polygons.
Returns
-------
gpd.GeoDataFrame
Polygon geometries adjusted so no areas overlap.
"""
merge = frame.copy()
new_geometry = self.get_polygon_difference(frame=frame)
merge.geometry = new_geometry
return merge
[docs]
def get_gdir_by_name(self, data: gpd.GeoDataFrame, name: str, **kwargs):
"""Get glacier directory from a glacier's name or RGI ID.
Parameters
----------
data : gpd.GeoDataFrame
Glacier data.
name : str
Name or RGI ID of glacier.
**kwargs
Additional arguments passed to ``workflow.init_glacier_directories``.
"""
rgi_id = self.get_glacier_by_name(data=data, name=name)["RGIId"].values
gdirs = self.get_glacier_directories(rgi_ids=rgi_id, **kwargs)
return gdirs[0]
[docs]
def get_glacier_by_name(
self, data: gpd.GeoDataFrame, name: str
) -> gpd.GeoDataFrame:
"""Get glacier data by full name or RGI ID.
Parameters
----------
data : gpd.GeoDataFrame
Glacier data.
name : str
Full name or RGI ID of glacier.
Returns
-------
gpd.GeoDataFrame
Glacier with matching name or RGI ID.
"""
name = name.replace("ö", "oe")
glacier = data[data["Name"] == name]
if glacier.empty: # fallback in case of RGI ID
glacier = self.get_glacier_by_rgi_id(data=data, rgi_id=name)
return glacier
[docs]
def get_glacier_by_rgi_id(
self, data: gpd.GeoDataFrame, rgi_id: str
) -> gpd.GeoDataFrame:
"""Get glacier data by RGI ID.
Parameters
----------
data : gpd.GeoDataFrame
Glacier data.
rgi_id : str
RGI ID of glacier.
Returns
-------
gpd.GeoDataFrame
Glacier with matching RGI ID.
"""
glacier = data[data["RGIId"] == rgi_id]
if glacier.empty:
raise KeyError(f"{rgi_id} not found.")
return glacier
[docs]
def get_named_glaciers(self, glaciers: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Get all glaciers with a full name.
Parameters
----------
glaciers : gpd.GeoDataFrame
Glacier data.
Returns
-------
gpd.GeoDataFrame
All named glaciers.
"""
return glaciers.dropna(subset="Name")
[docs]
def get_user_subregion(
self,
region_name: str = "",
subregion_name: str = "",
shapefile_path: str = "",
**oggm_params,
) -> dict:
"""Get user-selected subregion in a catchment area.
This should be called via gateway.
Parameters
----------
region_name : str, optional
Name of RGI region.
subregion_name : str, optional
Name of RGI subregion.
shapefile_path : str, optional
Path to shapefile with catchment area outlines.
**oggm_params
Additional OGGM configuration parameters.
"""
if subregion_name:
temp_name = f"OGGM_{subregion_name}"
elif region_name:
temp_name = f"OGGM_{region_name}"
else:
raise ValueError("No region or subregion name supplied.")
self.init_oggm(dirname=temp_name, **oggm_params)
if shapefile_path: # if user uploads/selects a shapefile
if isinstance(shapefile_path, str):
shapefile_path = f"{self.SHAPEFILE_PATH}/{shapefile_path}"
user_shapefile = self.get_shapefile(path=shapefile_path)
rgi_file = self.get_rgi_files_from_subregion(region_name=region_name)
user_shapefile = self.set_subregion_constraints(
region=rgi_file, subregion=user_shapefile, subregion_id=subregion_name
)
rgi_file = self.get_glaciers_in_subregion(
region=rgi_file, subregion=user_shapefile
)
else:
rgi_file = self.get_rgi_files_from_subregion(
region_name=region_name, subregion_name=subregion_name
)
user_shapefile = None
return {"glacier_data": rgi_file, "shapefile": user_shapefile}
[docs]
class BindingsHydro(BindingsOggmWrangler):
"""Bindings for running model with hydrological spinup.
Attributes
----------
hydro_location : str
Output directory name.
"""
[docs]
def __init__(
self,
base_url: str = "https://cluster.klima.uni-bremen.de/~oggm/demo_gdirs",
working_dir: str = "",
oggm_params: dict = None,
hydro_location: str = "_spinup_historical_hydro",
):
super().__init__(
base_url=base_url, working_dir=working_dir, oggm_params=oggm_params
)
self.hydro_location = hydro_location
def get_compiled_run_output(
self,
gdir,
keys: list = None,
input_filesuffix: str = "",
) -> xr.Dataset:
if not input_filesuffix:
input_filesuffix = self.hydro_location
ds = utils.compile_run_output(gdir, input_filesuffix=input_filesuffix)
if keys:
ds = ds[keys]
return ds
def get_specific_mass_balance(self, ds, rgi_id: str):
volume = ds.loc[{"rgi_id": rgi_id}].volume.values
area = ds.loc[{"rgi_id": rgi_id}].area.values
smb = (volume[1:] - volume[:-1]) / area[1:] * cfg.PARAMS["ice_density"] / 1000
return smb
# volume = ds.volume_m3.values
# area = ds.area_m2.values
# smb = (volume[1:] - volume[:-1]) / area[1:] * cfg.PARAMS['ice_density'] / 1000
# return smb
def get_hydro_climatology(self, gdir, nyears: int = 20) -> xr.Dataset:
workflow.execute_entity_task(
tasks.run_with_hydro,
gdir, # Select the one glacier but actually should be a list
run_task=tasks.run_from_climate_data,
init_model_filesuffix="_spinup_historical",
init_model_yr=1979,
ref_area_yr=2000, # Important ! Fixed gauge runoff with ref year 2000
ys=1979,
ye=2020,
output_filesuffix=self.hydro_location,
store_monthly_hydro=True,
)[0]
with xr.open_dataset(
gdir.get_filepath("model_diagnostics", filesuffix=self.hydro_location)
) as ds:
ds = ds.isel(time=slice(0, -1)).load()
return ds
def get_annual_runoff(self, ds: xr.Dataset):
sel_vars = [v for v in ds.variables if "month_2d" not in ds[v].dims]
df_annual = ds[sel_vars].to_dataframe()
runoff_vars = [
"melt_off_glacier",
"melt_on_glacier",
"liq_prcp_off_glacier",
"liq_prcp_on_glacier",
]
df_runoff = df_annual[runoff_vars] * 1e-9
return df_runoff
[docs]
def get_min_max_runoff_years(
self, annual_runoff: pd.Series, nyears: int = 20
) -> tuple:
"""Get the years with minimum and maximum runoff.
Parameters
----------
annual_runoff : pd.Series
Time series of annual runoff.
nyears : int, default 20
Time period in years. The end date is always the most
recent available year.
Returns
-------
tuple[int]
The years with minimum and maximum runoff.
"""
if len(annual_runoff) > nyears:
annual_runoff = annual_runoff.iloc[-nyears:]
runoff = annual_runoff.sum(axis=1)
runoff_year_min = int(runoff.idxmin())
runoff_year_max = int(runoff.idxmax())
return runoff_year_min, runoff_year_max
[docs]
def get_monthly_runoff(self, ds: xr.Dataset, nyears: int = 20) -> xr.DataArray:
"""Get the monthly glacier runoff.
Parameters
----------
ds : xr.Dataset
Glacier climatology.
nyears : int, default 20
Time period in years. The end date is always the most
recent available year.
Returns
-------
xr.Dataset
Monthly runoff.
"""
monthly_runoff = (
ds["melt_off_glacier_monthly"]
+ ds["melt_on_glacier_monthly"]
+ ds["liq_prcp_off_glacier_monthly"]
+ ds["liq_prcp_on_glacier_monthly"]
)
if len(monthly_runoff) > nyears:
monthly_runoff = monthly_runoff.isel(time=slice(nyears, None))
monthly_runoff *= 1e-9
return monthly_runoff
[docs]
def get_climatology(self, data: gpd.GeoDataFrame, name: str = "") -> tuple:
"""Get climatological data for a glacier.
Parameters
----------
data : gpd.GeoDataFrame
Contains data for one or more glaciers.
name : str, optional
Name of glacier.
Returns
-------
tuple[xr.Dataset,np.ndarray]
Climatological data and specific mass balance for a glacier.
"""
glacier = self.get_gdir_by_name(
data=data,
name=name,
from_prepro_level=5,
prepro_border=160,
prepro_base_url="https://cluster.klima.uni-bremen.de/~oggm/gdirs/oggm_v1.6/L3-L5_files/2023.3/elev_bands/W5E5_spinup",
)
climatology = self.get_hydro_climatology(gdir=glacier)
try:
mass_balance = self.get_compiled_run_output(
gdir=glacier,
# keys=["volume", "area"],
input_filesuffix=self.hydro_location,
)
mass_balance = self.get_specific_mass_balance(
ds=mass_balance, rgi_id=glacier.rgi_id
)
ref_data = glacier.get_ref_mb_data()
except RuntimeError:
ref_data = None
mass_balance = None
return climatology, ref_data, mass_balance
def get_aggregate_runoff(self, data: gpd.GeoDataFrame) -> dict:
annual_runoff = []
monthly_runoff = []
mass_balance = []
observations = []
for rgi_id in data["RGIId"]:
climatology, ref_data, specific_mb = self.get_climatology(
data=data, name=rgi_id
)
mass_balance.append(specific_mb)
observations.append(ref_data)
annual_runoff.append(self.get_annual_runoff(ds=climatology))
monthly_runoff.append(self.get_monthly_runoff(ds=climatology))
total_annual_runoff = sum(annual_runoff)
total_monthly_runoff = sum(monthly_runoff)
min_year, max_year = self.get_min_max_runoff_years(
annual_runoff=total_annual_runoff
)
runoff_data = {
"annual_runoff": total_annual_runoff,
"monthly_runoff": total_monthly_runoff,
"runoff_year_min": min_year,
"runoff_year_max": max_year,
"mass_balance": mass_balance,
"wgms": observations,
}
return runoff_data
def get_runoff(self, data: xr.Dataset, name: str) -> dict:
climatology, observations, mass_balance = self.get_climatology(
data=data, name=name
)
annual_runoff = self.get_annual_runoff(ds=climatology)
monthly_runoff = self.get_monthly_runoff(ds=climatology)
min_year, max_year = self.get_min_max_runoff_years(annual_runoff=annual_runoff)
runoff_data = {
"annual_runoff": annual_runoff,
"monthly_runoff": monthly_runoff,
"runoff_year_min": min_year,
"runoff_year_max": max_year,
"mass_balance": mass_balance,
"wgms": observations,
}
return runoff_data
[docs]
class BindingsCryotempo(BindingsOggmWrangler):
"""Bindings for interacting with CryoTEMPO-EOLIS data.
Attributes
----------
hydro_location : str
Output directory name.
"""
[docs]
def __init__(
self,
base_url: str = "https://cluster.klima.uni-bremen.de/~dtcg/gdirs/v2026.1",
working_dir: str = "",
oggm_params: dict = {
"border": 160,
"store_model_geometry": True,
},
hydro_location: str = "_spinup_historical_hydro",
):
super().__init__(
base_url=base_url, working_dir=working_dir, oggm_params=oggm_params
)
self.datacube_manager = cryotempo_eolis.DatacubeCryotempoEolis()
self.calibrator = dtcg.integration.calibration.CalibratorCryotempo()
self.hydro_location = hydro_location
[docs]
def init_oggm(self, dirname="", **kwargs):
return super().init_oggm(dirname, **kwargs)
[docs]
def get_glacier_directories(self, rgi_ids: list, **kwargs):
return super().get_glacier_directories(rgi_ids, **kwargs)
[docs]
def get_glacier_data(self, gdirs: list, dem=False) -> None:
"""Add velocity data, monthly and daily W5E5 data."""
if dem:
workflow.execute_entity_task(
rgitopo.select_dem_from_dir,
gdirs,
dem_source="COPDEM90",
keep_dem_folders=True,
)
workflow.execute_entity_task(tasks.glacier_masks, gdirs)
workflow.execute_entity_task(its_live.itslive_velocity_to_gdir, gdirs)
# monthly data needed to prevent silent failures
workflow.execute_entity_task(gdirs=gdirs, task=w5e5.process_w5e5_data)
workflow.execute_entity_task(
gdirs=gdirs, task=w5e5.process_w5e5_data, daily=True
)
[docs]
def get_eolis_data(self, gdir):
"""Get gridded data enhanced with CryoTEMPO-EOLIS data."""
with xr.open_dataset(gdir.get_filepath("gridded_data")) as datacube:
datacube = datacube.load()
keywords = (
"rgi",
"glacier",
"name",
"terminus",
"cenl", # latitude, longitude
"glims",
)
glacier_attributes = {
key: val
for key, val in gdir.__dict__.items()
if any(k in key for k in keywords)
}
datacube.attrs.update({"glacier_attributes": glacier_attributes})
self.datacube_manager.retrieve_prepare_eolis_gridded_data(
oggm_ds=datacube, grid=gdir.grid
)
geozarr_handler = GeoZarrHandler(datacube)
return gdir, geozarr_handler
[docs]
def get_aggregate_runoff(self, gdir) -> dict:
"""Get the computed runoff from OGGM."""
annual_runoff = []
monthly_runoff = []
mass_balance = []
observations = []
climatology, ref_data, specific_mb = self.get_climatology(
name=gdir.rgi_id, gdir=gdir
)
mass_balance.append(specific_mb)
observations.append(ref_data)
annual_runoff.append(self.get_annual_runoff(ds=climatology))
monthly_runoff.append(self.get_monthly_runoff(ds=climatology))
total_annual_runoff = sum(annual_runoff)
total_monthly_runoff = sum(monthly_runoff)
min_year, max_year = self.get_min_max_runoff_years(
annual_runoff=total_annual_runoff
)
runoff_data = {
"annual_runoff": total_annual_runoff,
"monthly_runoff": total_monthly_runoff,
"runoff_year_min": min_year,
"runoff_year_max": max_year,
"mass_balance": mass_balance,
"wgms": observations,
}
return runoff_data
[docs]
def get_monthly_runoff(self, ds: xr.Dataset, nyears: int = 20) -> xr.DataArray:
"""Get the monthly glacier runoff.
Parameters
----------
ds : xr.Dataset
Glacier climatology.
nyears : int, default 20
Time period in years. The end date is always the most
recent available year.
Returns
-------
xr.Dataset
Monthly runoff.
"""
monthly_runoff = (
ds["melt_off_glacier_monthly"]
+ ds["melt_on_glacier_monthly"]
+ ds["liq_prcp_off_glacier_monthly"]
+ ds["liq_prcp_on_glacier_monthly"]
)
if len(monthly_runoff) > nyears:
monthly_runoff = monthly_runoff.isel(time=slice(nyears, None))
monthly_runoff *= 1e-9
return monthly_runoff
[docs]
def get_climatology(self, gdir, name: str = "") -> tuple:
"""Get climatological data for a glacier.
Parameters
----------
data : gpd.GeoDataFrame
Contains data for one or more glaciers.
name : str, optional
Name of glacier.
Returns
-------
tuple[xr.Dataset,np.ndarray]
Climatological data and specific mass balance for a glacier.
"""
climatology = self.get_hydro_climatology(gdir=gdir)
try:
mass_balance = self.get_compiled_run_output(
gdir=gdir,
# keys=["volume", "area"],
input_filesuffix=self.hydro_location,
)
mass_balance = self.get_specific_mass_balance(
ds=mass_balance, rgi_id=gdir.rgi_id
)
ref_data = gdir.get_ref_mb_data()
except RuntimeError:
ref_data = None
mass_balance = None
return climatology, ref_data, mass_balance
def get_annual_runoff(self, ds: xr.Dataset):
sel_vars = [v for v in ds.variables if "month_2d" not in ds[v].dims]
df_annual = ds[sel_vars].to_dataframe()
runoff_vars = [
"melt_off_glacier",
"melt_on_glacier",
"liq_prcp_off_glacier",
"liq_prcp_on_glacier",
]
df_runoff = df_annual[runoff_vars] * 1e-9
return df_runoff
[docs]
def get_min_max_runoff_years(
self, annual_runoff: pd.Series, nyears: int = 20
) -> tuple:
"""Get the years with minimum and maximum runoff.
Parameters
----------
annual_runoff : pd.Series
Time series of annual runoff.
nyears : int, default 20
Time period in years. The end date is always the most
recent available year.
Returns
-------
tuple[int]
The years with minimum and maximum runoff.
"""
if len(annual_runoff) > nyears:
annual_runoff = annual_runoff.iloc[-nyears:]
runoff = annual_runoff.sum(axis=1)
runoff_year_min = int(runoff.idxmin())
runoff_year_max = int(runoff.idxmax())
return runoff_year_min, runoff_year_max
def get_hydro_climatology(self, gdir, nyears: int = 20) -> xr.Dataset:
workflow.execute_entity_task(
tasks.run_with_hydro,
gdir, # Select the one glacier but actually should be a list
run_task=tasks.run_from_climate_data,
init_model_filesuffix="_spinup_historical",
init_model_yr=1979,
ref_area_yr=2000, # Important ! Fixed gauge runoff with ref year 2000
ys=1979,
ye=2020,
output_filesuffix=self.hydro_location,
store_monthly_hydro=True,
)[0]
with xr.open_dataset(
gdir.get_filepath("model_diagnostics", filesuffix=self.hydro_location)
) as ds:
ds = ds.isel(time=slice(0, -1)).load()
return ds
def get_compiled_run_output(
self,
gdir,
keys: list = None,
input_filesuffix: str = "",
) -> xr.Dataset:
if not input_filesuffix:
input_filesuffix = self.hydro_location
ds = utils.compile_run_output(gdir, input_filesuffix=input_filesuffix)
if keys:
ds = ds[keys]
return ds
def get_specific_mass_balance(self, ds, rgi_id: str):
volume = ds.loc[{"rgi_id": rgi_id}].volume.values
area = ds.loc[{"rgi_id": rgi_id}].area.values
smb = (volume[1:] - volume[:-1]) / area[1:] * cfg.PARAMS["ice_density"] / 1000
return smb
def get_cached_data(
self, rgi_id: str, cache="../../ext/data/l2_precompute/"
) -> dict:
if isinstance(cache, str):
cache = Path(cache)
cache_path = cache / rgi_id
gdir = self.get_cached_gdir_data(cache_path=cache_path)
smb = self.get_cached_smb_data(cache_path=cache_path)
runoff = {}
for key in [
"Daily_Hugonnet_2000_2020",
"Daily_Cryosat_2011_2020",
"SfcDaily_Cryosat_2011_2020",
]:
runoff[key] = self.get_cached_runoff_data(cache_path=cache_path, suffix=key)
# runoff["runoff_year_min"] = gdir["runoff_data"]["runoff_year_min"]
# runoff["runoff_year_max"] = gdir["runoff_data"]["runoff_year_max"]
eolis = self.get_cached_eolis_data(cache_path=cache_path)
outlines = self.get_cached_outline_data(cache_path=cache_path)
cached_data = {
"gdir": gdir,
"smb": smb,
"runoff": runoff,
"eolis": eolis,
"outlines": outlines,
}
return cached_data
def get_cached_l1_data(self, rgi_id: str, cache="../../ext/data/l1_precompute/"):
if isinstance(cache, str):
cache = Path(cache)
cache_path = cache / rgi_id
gdir = self.get_cached_gdir_data(cache_path=cache_path)
return gdir
def get_cached_gdir_data(self, cache_path: Path) -> dict:
try:
with open(cache_path / "gdir.json", mode="r", encoding="utf-8") as file:
raw = file.read()
gdir = dict(json.loads(raw))
except FileNotFoundError:
return None
return gdir
def get_cached_smb_data(self, cache_path: Path) -> np.ndarray:
try:
smb = np.load(cache_path / "smb.npz")
except FileNotFoundError:
return None
return smb
def get_cached_runoff_data(
self, cache_path: Path, suffix="Daily_Hugonnet_2000_2020"
) -> dict:
try:
runoff = xr.open_dataarray(cache_path / f"runoff_{suffix}.nc")
except FileNotFoundError:
return None
runoff_data = {"monthly_runoff": runoff}
return runoff_data
def get_cached_eolis_data(self, cache_path: Path) -> xr.DataArray:
try:
eolis_data = xr.open_dataset(cache_path / "eolis.nc")
# eolis_data = {
# "eolis_elevation_change_timeseries": eolis_raw.eolis_elevation_change_timeseries,
# "eolis_elevation_change_sigma_timeseries": eolis_raw.eolis_elevation_change_sigma_timeseries,
# }
except FileNotFoundError:
return None
return eolis_data
def get_cached_metadata(
self, index="glacier_index", cache="../../ext/data/l2_precompute/"
):
if isinstance(cache, str):
cache = Path(cache)
cache_path = cache / index
with open(
cache_path / f"glacier_index.json", mode="r", encoding="utf-8"
) as file:
raw = file.read()
metadata = dict(json.loads(raw))
return metadata