"""Copyright 2025 DTCG Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
=====
Calibrate OGGM models.
"""
import inspect
import logging
import os
import warnings
from datetime import datetime
from functools import partial
import numpy as np
import pandas as pd
import SALib.sample.sobol as sampler
import xarray as xr
from dateutil import tz
from dateutil.relativedelta import relativedelta
from dateutil.tz import UTC
from numpy.typing import ArrayLike
from oggm import GlacierDirectory, cfg, entity_task, tasks, utils, workflow
from oggm.core import massbalance
from tqdm import tqdm
# Module logger
log = logging.getLogger(__name__)
[docs]
class Calibrator:
"""Bindings for calibrating OGGM models.
Attributes
----------
model_matrix : dict, default None
Matrix of model calibration parameters for different run
configurations.
datacube_l1 : Any, default None
gdir : GlacierDirectory, default None
An OGGM glacier directory.
"""
[docs]
def __init__(self, model_matrix: dict = None, datacube_l1=None, gdir=None):
if not model_matrix:
self.model_matrix = {}
else:
self.model_matrix = model_matrix
self.gdir = gdir
self.datacube_l1 = datacube_l1
if datacube_l1 is not None:
self.RGI_ID_attrs = datacube_l1.attrs["RGI-ID"]
else:
self.RGI_ID_attrs = "Not provided"
[docs]
def set_model_matrix(self, name: str, model, geo_period: str, **kwargs):
"""Set model parameters for calibration.
Parameters
----------
name : str
Name of configuration parameters. Ideally this should match
the settings filesuffix.
model : oggm.core.massbalance.MassBalanceModel
OGGM mass balance model class.
geo_period : str
Reference mass balance period in the format
YYYY-MM-DD_YYYY-MM-DD.
kwargs
Additional arguments passed to the mass balance model
class.
"""
matrix = {
name: {
"model": model,
"geo_period": geo_period,
}
}
if kwargs:
matrix[name].update(kwargs)
self.model_matrix.update(matrix)
[docs]
def get_nearest_datetime(self, dates: np.ndarray, target_date: str):
"""Get the nearest datetime.
Parameters
----------
dates : array_like
A range of dates that can be converted to ``datetime64``.
target_date: str
Date in the ISO-8601 format "YYYY-MM-DD".
Returns
-------
"""
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message="no explicit representation of timezones available for np.datetime64",
category=UserWarning,
)
dates_ns = dates.astype("datetime64[ns]")
target_ns = np.datetime64(datetime.strptime(target_date, "%Y-%m-%d"), "ns")
diffs = np.abs(dates_ns - target_ns)
idx = int(np.argmin(diffs))
return dates[idx]
[docs]
def get_mcs_parameter_sample(
self,
gdir: GlacierDirectory,
control_filesuffix: str,
nr_samples: int = 2**4,
ref_mb_err_sample_interval: float = 1.0,
prcp_fac_sample_interval: float = 0.5,
temp_bias_sample_interval: float = 0.5,
melt_f_sample_interval: float = 0.25,
):
"""Defines the distribution of the observation and model
parameters and create a sample for the Monte Carlo simulation.
Parameters
----------
gdir : GlacierDirectory
control_filesuffix: str
The filesuffix for the control run's settings, which used
only the most likely values of the default calibration.
nr_samples: int, default 2^4.
Number of samples. Must be a multiple of 2. The total number
of generated samples equals ``nr_samples * (4 + 2)``, where
4 is the number of variables used for sampling (``ref_mb``,
``prcp_fac``, ``melt_f``, ``temp_bias``).
ref_mb_err_sample_interval: float, default 1.0
Multiplication factor applied to ``ref_mb_err``. It assumes
``ref_mb`` has a normal distribution.
prcp_fac_sample_interval: float, default 0.5
The width of the bounds which are added and subtracted from
the ``prcp_fac`` of the control run. A uniform distribution
is used for sampling.
temp_bias_sample_interval: float, default 0.5
The width of the bounds which are added and subtracted from
the ``temp_bias`` of the control run. A uniform distribution
is used for sampling.
melt_f_sample_interval: float, default 0.5
The width of the bounds which are added and subtracted from
the ``melt_f`` of the control run. A uniform distribution
is used for sampling.
Returns
-------
tuple
"""
# to have a better overview I provide the parameter distribution
# in a more readable way first
parameter_distribution = {}
# add geodetic mb observation
gdir.observations_filesuffix = control_filesuffix
ref_mb = gdir.observations["ref_mb"]
parameter_distribution["ref_mb"] = {
"bounds": [ref_mb["value"], ref_mb["err"] * ref_mb_err_sample_interval],
"dists": "norm",
}
# get default maximum values for mb parameters
gdir.settings_filesuffix = ""
prcp_fac_min = gdir.settings["prcp_fac_min"]
prcp_fac_max = gdir.settings["prcp_fac_max"]
melt_f_min = gdir.settings["melt_f_min"]
melt_f_max = gdir.settings["melt_f_max"]
temp_bias_min = gdir.settings["temp_bias_min"]
temp_bias_max = gdir.settings["temp_bias_max"]
# add mb parameter distributions
gdir.settings_filesuffix = control_filesuffix
# prcp_fac
prcp_fac_control = gdir.settings["prcp_fac"]
prcp_fac_bounds = [
max(prcp_fac_control - prcp_fac_sample_interval, prcp_fac_min),
min(prcp_fac_control + prcp_fac_sample_interval, prcp_fac_max),
]
gdir.settings["prcp_fac_min"] = prcp_fac_bounds[0]
gdir.settings["prcp_fac_max"] = prcp_fac_bounds[1]
parameter_distribution["prcp_fac"] = {
"bounds": prcp_fac_bounds,
"dists": "unif",
}
# melt_f
melt_f_control = gdir.settings["melt_f"]
melt_f_bounds = [
max(melt_f_control - melt_f_sample_interval, melt_f_min),
min(melt_f_control + melt_f_sample_interval, melt_f_max),
]
gdir.settings["melt_f_min"] = melt_f_bounds[0]
gdir.settings["melt_f_max"] = melt_f_bounds[1]
parameter_distribution["melt_f"] = {"bounds": melt_f_bounds, "dists": "unif"}
# temp_bias
temp_bias_control = gdir.settings["temp_bias"]
temp_bias_bounds = [
max(temp_bias_control - temp_bias_sample_interval, temp_bias_min),
min(temp_bias_control + temp_bias_sample_interval, temp_bias_max),
]
gdir.settings["temp_bias_min"] = temp_bias_bounds[0]
gdir.settings["temp_bias_max"] = temp_bias_bounds[1]
parameter_distribution["temp_bias"] = {
"bounds": temp_bias_bounds,
"dists": "unif",
}
# now convert to format which is expected by SALib
num_vars = 0
problem = {
"names": [],
"bounds": [],
"dists": [],
}
for key in parameter_distribution:
num_vars += 1
problem["names"].append(key)
problem["bounds"].append(parameter_distribution[key]["bounds"])
problem["dists"].append(parameter_distribution[key]["dists"])
problem["num_vars"] = num_vars
# create a sample of parameters
param_values = sampler.sample(problem, nr_samples, calc_second_order=False)
return param_values, problem["names"]
[docs]
def calculate_and_add_total_runoff(self, ds):
"""Get total annual and monthly runoff from runoff components.
Parameters
----------
ds
Returns
-------
"""
ds["runoff_monthly"] = (
ds["melt_off_glacier_monthly"]
+ ds["melt_on_glacier_monthly"]
+ ds["liq_prcp_off_glacier_monthly"]
+ ds["liq_prcp_on_glacier_monthly"]
)
ds.runoff_monthly.attrs = {
"unit": "kg month-1",
"description": (
"Monthly glacier runoff from sum of monthly melt and liquid "
"precipitation on and off the glacier using a fixed-gauge with "
"a glacier minimum reference area from year 2000"
),
}
ds["runoff"] = (
ds["melt_off_glacier"]
+ ds["melt_on_glacier"]
+ ds["liq_prcp_off_glacier"]
+ ds["liq_prcp_on_glacier"]
)
ds.runoff.attrs = {
"unit": "kg yr-1",
"description": (
"Annual glacier runoff: sum of annual melt and liquid "
"precipitation on and off the glacier using a fixed-gauge with "
"a glacier minimum reference area from year 2000"
),
}
return ds
[docs]
def add_specific_mb(self, ds, mb_model, fls, time_resolution, unit):
"""Get specific mass-balance in mm w.e. from a volume change
time series.
Parameters
----------
ds
mb_model
fls
time_resolution
unit
Returns
-------
"""
specific_mb = mb_model.get_specific_mb(
fls=fls, year=ds.time[:-1].values, time_resolution=time_resolution
)
ds = ds.assign(
specific_mb=xr.DataArray(
specific_mb, dims="time", coords={"time": ds.time[:-1]}
)
.reindex(time=ds.time) # adds last coord, fills with NaN
.broadcast_like(ds.volume)
)
# ds['specific_mb'] = (
# ds.volume.diff(dim='time', label='lower').reindex(time=ds.time) /
# ds.area * cfg.PARAMS['ice_density'])
return self.add_specific_mb_attrs(ds, unit)
[docs]
def add_annual_cumulative(
self,
ds,
var,
year_coord="calendar_year",
cumulative_suffix="calendar_cum",
):
"""
Parameters
----------
ds
var
year_coord
cumulative_suffix
Returns
-------
"""
out_var = f"{var}_{cumulative_suffix}"
# add "calendar_year" if not available
if year_coord not in ds.coords:
# Derive integer year from float-year time coordinate
year = np.floor(ds["time"]).astype(int)
# Attach year as coordinate for grouping
ds = ds.assign_coords({year_coord: ("time", year.data)})
def _cumsum(x):
return x.cumsum(dim="time")
annual_cumulative = ds[var].groupby(year_coord).map(_cumsum)
# Store back into the same dataset and adapt description
ds[out_var] = annual_cumulative
attrs = ds[out_var].attrs
attrs["description"] = f"cumulated over {year_coord}"
ds[out_var].attrs = attrs
return ds
[docs]
def add_specific_mb_attrs(self, ds, unit):
"""
Parameters
----------
ds
unit
Returns
-------
"""
ds.specific_mb.attrs = {"unit": unit, "description": ("Specific mass-balance")}
return ds
[docs]
def add_snowline_attrs(self, ds, inf_values=None):
"""
Parameters
----------
ds
inf_values
Returns
-------
"""
ds.snowline.attrs = {
"unit": "m",
"description": ("Snowline altitude"),
"inf_values": str(inf_values),
}
return ds
[docs]
def get_calibrated_datacubes(
self,
mb_model_class,
ref_mb: float,
ref_mb_err: float,
ref_mb_unit: str = "kg m-2 yr-1",
ref_mb_period: str = "",
gdir: GlacierDirectory = None,
first_guess_settings: str = "",
datacube_types: str | list[str] = "monthly",
calibration_filesuffix: str = "",
calibration_strategy: str = "",
mcs_sampling_settings: dict = None,
calibration_parameters_control: dict = None,
calibration_parameters_mcs: dict = None,
quantiles: list = None,
climate_input_filesuffix: str = None,
show_log: bool = False,
multiprocessing_during_datacube_creation: bool = True,
**kwargs,
) -> dict:
"""Calibrate models and generate datacubes.
Uses the provided reference mass balance to calibrate models and
generate model datacubes. These also include an uncertainty
propagation through a Monte Carlo simulation.
Parameters
----------
gdir : GlacierDirectory
OGGM glacier directory.
mb_model_class : oggm.core.massbalance.MassBalanceModel
OGGM mass balance model class.
ref_mb : float
Reference mass balance.
ref_mb_err : float
Reference mass balance uncertainty.
ref_mb_unit : str, default "kg m-2 yr-1"
Unit used for the reference mass balance.
ref_mb_period : str, optional
Measurement period for the reference mass balance.
first_guess_settings: str, optional
the mb parameters which should be used as a first guess
datacube_types: str or list[str], default "monthly"
Desired datacube types. Select one or provide a list.
Options are:
* ``monthly``: monthly volume, area, length and mass
balance.
* ``annual_hydro``: annual volume, area, length and mass
balance, and annual and monthly runoff components.
* ``daily_smb``: daily mass balance.
calibration_filesuffix: str
The filesuffix used for all outputs of this method.
calibration_strategy: str
Text describing the current calibration strategy, e.g.
"OGGM model X calibrated with data from Y over the period Z."
mcs_sampling_settings: dict, default None
Settings for Monte Carlo simulation parameter sampling. For
default values look at signature of
``Calibrator.get_mcs_parameter_sample``.
calibration_parameters_control: dict, default None
The order of calibration parameters for the control run.
If None, defaults to::
{
"calibrate_param1": "melt_f",
"calibrate_param2": "prcp_fac",
"calibrate_param3": "temp_bias",
}
calibration_parameters_mcs: dict, default None
The order of calibration parameters for the Monte Carlo
simulation runs. If None, defaults to::
{
"calibrate_param1": "prcp_fac",
"calibrate_param2": "melt_f",
"calibrate_param3": "temp_bias",
}
quantiles: list, default None
The quantiles used for the aggregation of the Monte Carlo simulation
results, including labels for the final datacube. If None,
defaults to [0.05, 0.15, 0.25, 0.50, 0.75, 0.85, 0.95].
climate_input_filesuffix: str, optional
The filesuffix of the climate input file.
show_log: bool, default False
Show some basic logs during execution if True.
kwargs: dict
kwargs to pass to the mb_model_class instance.
Returns
-------
dict
The model datacubes generated by using the provided data for
calibration, including uncertainties.
"""
if not calibration_strategy:
raise ValueError(
"You must provide a description of the currently "
"applied calibration strategy in the argument "
"'calibration_strategy'. (e.g. OGGM model X "
"calibrated with data from Y over the period Z.)"
)
if gdir is None:
gdir = self.gdir
if not calibration_filesuffix:
model_name = mb_model_class.__name__
calibration_filesuffix = f"{model_name}_{ref_mb_period}"
if not climate_input_filesuffix:
# for MySfcTypeTIModel we often use partial
climate_input_filesuffix = "_era_monthly"
climate_input_filesuffix_daily = "_era_daily"
if isinstance(mb_model_class, partial):
if mb_model_class.keywords["mb_model_class"].__name__ in [
"DailyTIModel"
]:
climate_input_filesuffix = climate_input_filesuffix_daily
elif mb_model_class.__name__ in ["SfcTypeTIModel"]:
if inspect.signature(mb_model_class.__init__).parameters[
"mb_model_class"
].default.__name__ in ["DailyTIModel"]:
climate_input_filesuffix = climate_input_filesuffix_daily
elif mb_model_class.__name__ in ["DailyTIModel"]:
climate_input_filesuffix = climate_input_filesuffix_daily
# we need to extend the climate file to full calendar years to work with
# OGGM, at the end we restore the original file again
fp_climate = gdir.get_filepath(
"climate_historical", filesuffix=climate_input_filesuffix + "{}"
)
with xr.open_dataset(fp_climate.format("")) as ds_clim:
ds_clim = ds_clim.load()
# save the original file with suffix '_original' for later
ds_clim.to_netcdf(fp_climate.format("_original"))
ds_clim, last_available_date = extend_to_full_calendar_year(ds_clim)
ds_clim.to_netcdf(fp_climate.format(""))
# check if we are working with SfcTypeTIModel for snowline
save_snowline = False
if isinstance(mb_model_class, partial):
if mb_model_class.func.__name__ in ["SfcTypeTIModel"]:
save_snowline = True
elif mb_model_class.__name__ in ["SfcTypeTIModel"]:
save_snowline = True
# we apply the kwargs to the mb_model class here, to be sure all
# subsequent tasks use them correctly
if kwargs:
mb_model_class = partial(mb_model_class, **kwargs)
if quantiles is None:
quantiles = [0.05, 0.15, 0.25, 0.50, 0.75, 0.85, 0.95]
if calibration_parameters_control is None:
calibration_parameters_control = {
"calibrate_param1": "melt_f",
"calibrate_param2": "prcp_fac",
"calibrate_param3": "temp_bias",
}
# define what should be the default settings we use as first guess for
# control calibration
gdir.settings_filesuffix = first_guess_settings
prcp_fac_fg = gdir.settings["prcp_fac"]
melt_f_fg = gdir.settings["melt_f"]
temp_bias_fg = gdir.settings["temp_bias"]
# conduct a control calibration, this is the approach without MCS
control_filesuffix = f"_{calibration_filesuffix}_control"
workflow_output = workflow.execute_entity_task(
tasks.mb_calibration_from_scalar_mb,
gdir,
settings_filesuffix=control_filesuffix,
observations_filesuffix=control_filesuffix,
ref_mb=ref_mb,
ref_mb_unit=ref_mb_unit,
ref_mb_err=ref_mb_err,
ref_mb_period=ref_mb_period,
overwrite_gdir=True,
**calibration_parameters_control,
prcp_fac=prcp_fac_fg,
melt_f=melt_f_fg,
temp_bias=temp_bias_fg,
mb_model_class=mb_model_class,
filename="climate_historical",
input_filesuffix=climate_input_filesuffix,
return_mb_model=True,
)
# the finally calibrated mass balance model is returned because of
# return_mb_model=True
mb_model_control = workflow_output[0][1]
# now sample parameters of control calibration
if mcs_sampling_settings is None:
mcs_sampling_settings = {}
elif not isinstance(mcs_sampling_settings, dict):
raise TypeError("MCS sampling settings must be a dictionary.")
parameter_sample, parameter_names = self.get_mcs_parameter_sample(
gdir=gdir, control_filesuffix=control_filesuffix, **mcs_sampling_settings
)
# now conduct MCS by execute calibration for each sample
gdir.observations_filesuffix = control_filesuffix
ref_mb_control = gdir.observations["ref_mb"]
if show_log:
print(
f"{calibration_filesuffix}:\n"
f" Starting Monte Carlo Simulation for "
f"{parameter_sample.shape[0]} ensemble members."
)
# prepare the settings for individual mcs runs for multiprocessing,
# define here kwargs which are individual for each run
all_mcs_runs = []
all_param_filesuffix = []
for i, param_val in enumerate(parameter_sample):
param_filesuffix = f"_{calibration_filesuffix}_{i}"
all_param_filesuffix.append(param_filesuffix)
all_mcs_runs.append(
(
gdir,
dict(
settings_filesuffix=param_filesuffix,
observations_filesuffix=param_filesuffix,
ref_mb=param_val[parameter_names.index("ref_mb")],
prcp_fac=param_val[parameter_names.index("prcp_fac")],
melt_f=param_val[parameter_names.index("melt_f")],
temp_bias=param_val[parameter_names.index("temp_bias")],
),
)
)
# execute mcs runs with execute_entity_task for multiprocessing,
# kwargs defined below are the same for all runs
old_continue_one_error = cfg.PARAMS["continue_on_error"]
cfg.PARAMS["continue_on_error"] = True
mcs_out = workflow.execute_entity_task(
mcs_mb_calibration_workflow,
all_mcs_runs,
settings_parent_filesuffix=control_filesuffix,
mb_model_class=mb_model_class,
filename="climate_historical",
input_filesuffix=climate_input_filesuffix,
ref_mb_period=ref_mb_control["period"],
ref_mb_err=ref_mb_control["err"],
ref_mb_unit=ref_mb_control["unit"],
calibration_parameters=calibration_parameters_mcs,
)
cfg.PARAMS["continue_on_error"] = old_continue_one_error
if not multiprocessing_during_datacube_creation:
# this is for testing a bug on the cluster
old_multiprocessing = cfg.PARAMS["use_multiprocessing"]
cfg.PARAMS["use_multiprocessing"] = False
# here we keep only working mcs runs and their mb_models
working_samples = []
mb_model_samples = []
for i, out in enumerate(mcs_out):
if out is None:
continue
working_samples.append(all_param_filesuffix[i])
mb_model_samples.append(out)
if show_log:
print(
f" Finished Monte Carlo Simulation: {len(working_samples)} "
f"ensemble members for output aggregation available.\n"
f" Start generating datacubes"
)
datacube_dict = {}
if isinstance(datacube_types, str):
datacube_types = [datacube_types]
for datacube_request in datacube_types:
if show_log:
print(f" Start generation of {datacube_request} datacube")
if datacube_request in ["annual_hydro", "monthly"]:
# define kwargs which are individual for each run
all_sample_runs = []
for sample_filesuffix in working_samples + [control_filesuffix]:
all_sample_runs.append(
(
gdir,
dict(
settings_filesuffix=sample_filesuffix,
output_filesuffix=f"{sample_filesuffix}_{datacube_request}",
),
)
)
if datacube_request == "annual_hydro":
datacube_vars = [
"volume",
"area",
"length",
"off_area",
"on_area",
"melt_off_glacier",
"melt_on_glacier",
"liq_prcp_off_glacier",
"liq_prcp_on_glacier",
"snowfall_off_glacier",
"snowfall_on_glacier",
"melt_off_glacier_monthly",
"melt_on_glacier_monthly",
"liq_prcp_off_glacier_monthly",
"liq_prcp_on_glacier_monthly",
"snowfall_off_glacier_monthly",
"snowfall_on_glacier_monthly",
"runoff_monthly",
"runoff",
"specific_mb",
"runoff_monthly_cumulative",
]
# run dynamic model with different calibration options
# kwargs which are the same for all runs, can be provided directly
workflow.execute_entity_task(
tasks.run_with_hydro,
all_sample_runs,
run_task=tasks.run_from_climate_data,
fixed_geometry_spinup_yr=2000,
ref_area_yr=2000,
ys=2000,
store_monthly_hydro=True,
mb_model_class=mb_model_class,
climate_filename="climate_historical",
climate_input_filesuffix=climate_input_filesuffix,
)
elif datacube_request == "monthly":
datacube_vars = [
"volume",
"area",
"length",
"specific_mb",
"specific_mb_calendar_cum",
]
# run dynamic model with different calibration options
# kwargs which are the same for all runs, can be provided directly
workflow.execute_entity_task(
tasks.run_from_climate_data,
all_sample_runs,
fixed_geometry_spinup_yr=2000,
ys=2000,
store_monthly_step=True,
mb_elev_feedback="monthly",
mb_model_class=mb_model_class,
climate_filename="climate_historical",
climate_input_filesuffix=climate_input_filesuffix,
)
# create a xarray datacube including all ensemble members
if datacube_request in ["annual_hydro", "monthly"]:
# compile run outputs and merge
ds_all = []
# open each ensemble member and add a new coordinate for later
# merging
for sample_filesuffix, mb_model in zip(
working_samples + [control_filesuffix],
mb_model_samples + [mb_model_control],
):
ds_tmp = utils.compile_run_output(
gdir, input_filesuffix=f"{sample_filesuffix}_{datacube_request}"
)
if datacube_request == "annual_hydro":
ds_tmp = self.calculate_and_add_total_runoff(ds_tmp)
cum_var = "runoff_monthly_cumulative"
var = "runoff_monthly"
ds_tmp[cum_var] = ds_tmp[var].cumsum(dim="month_2d")
ds_tmp[cum_var].attrs[
"description"
] = f"Cumulative {ds_tmp[var].attrs['description']}"
ds_tmp[cum_var].attrs["unit"] = "kg"
mb_unit = "mm w.e. yr-1"
time_resolution = "annual"
else:
mb_unit = "mm w.e. month-1"
time_resolution = "monthly"
fls = gdir.read_pickle("inversion_flowlines")
ds_tmp = self.add_specific_mb(
ds_tmp,
mb_model=mb_model,
fls=fls,
time_resolution=time_resolution,
unit=mb_unit,
)
if datacube_request == "monthly":
# add cumulative ouput
ds_tmp = self.add_annual_cumulative(ds_tmp, var="specific_mb")
ds_tmp["specific_mb_calendar_cum"].attrs["unit"] = "mm w.e."
ds_tmp = ds_tmp[datacube_vars]
# keep only data from the orignal climate file (we need full
# calendar years to work within OGGM)
if datacube_request == "monthly":
ds_tmp = ds_tmp.sel(
time=slice(
None,
utils.date_to_floatyear(
y=last_available_date.year,
m=last_available_date.month,
),
)
)
elif datacube_request == "annual_hydro":
ds_tmp = mask_mixed_year_month(
ds=ds_tmp, last_available=last_available_date
)
else:
raise NotImplementedError(f"{datacube_request}")
# add resulting dataset for merging
if sample_filesuffix == control_filesuffix:
ds_control = ds_tmp.expand_dims(member=["Control"])
ds_tmp.coords["sample_name"] = sample_filesuffix
ds_tmp = ds_tmp.expand_dims("sample_name")
ds_all.append(ds_tmp)
# merge all ensemble members into a single dataset
ds_all = xr.combine_by_coords(
ds_all, fill_value=np.nan, combine_attrs="override"
)
elif datacube_request == "daily_smb":
# define daily float year timeseries
y1 = int(mb_model_control.ye)
mb_years = utils.float_years_timeseries(
y0=2000, y1=y1, include_last_year=True, daily=True
)
# loop through all ensemble members and get specific mb
smb_all = []
if save_snowline:
snowline_all = []
fls = gdir.read_pickle("inversion_flowlines")
for sample_filesuffix, mb_model in zip(
working_samples + [control_filesuffix],
mb_model_samples + [mb_model_control],
):
smb_tmp = mb_model.get_specific_mb(
fls=fls, year=mb_years, time_resolution="daily"
)
if save_snowline:
# use isclose to avoid rounding errors
snowline_yr = np.isclose(
mb_model.snowline_year[:, None], mb_years, rtol=1e-10
).any(axis=1)
assert snowline_yr.sum() == len(mb_years)
snowline_tmp = mb_model.snowline[snowline_yr]
# set inf values for quantile computation
inf_values = mb_model.snowline_inf_values
if np.inf in inf_values:
snowline_tmp = np.where(
np.isposinf(snowline_tmp),
inf_values[np.inf],
snowline_tmp,
)
if -np.inf in inf_values:
snowline_tmp = np.where(
np.isneginf(snowline_tmp),
inf_values[-np.inf],
snowline_tmp,
)
if sample_filesuffix == control_filesuffix:
smb_control = smb_tmp
if save_snowline:
snowline_control = snowline_tmp
smb_all.append(smb_tmp)
if save_snowline:
snowline_all.append(snowline_tmp)
# combine all ensemble members in a dataset for later quantile
# computation
ds_all = xr.Dataset(
data_vars=dict(
specific_mb=(["sample_name", "time"], np.array(smb_all)),
),
coords=dict(
sample_name=working_samples + [control_filesuffix],
time=mb_years,
),
)
ds_all["time"].attrs = {"description": "Floating year"}
ds_all = self.add_specific_mb_attrs(ds_all, unit="mm w.e. day-1")
if save_snowline:
ds_all["snowline"] = (
("sample_name", "time"),
np.array(snowline_all),
)
ds_all = self.add_snowline_attrs(ds_all, inf_values=inf_values)
ds_control = xr.Dataset(
data_vars=dict(specific_mb=(["member", "time"], [smb_control])),
coords=dict(member=["Control"], time=mb_years),
)
ds_control["time"].attrs = {"description": "Floating year"}
ds_control = self.add_specific_mb_attrs(
ds_control, unit="mm w.e. day-1"
)
if save_snowline:
ds_control["snowline"] = (("member", "time"), [snowline_control])
ds_control = self.add_snowline_attrs(
ds_control, inf_values=inf_values
)
# add cumulative ouput
ds_all = self.add_annual_cumulative(ds_all, var="specific_mb")
ds_control = self.add_annual_cumulative(ds_control, var="specific_mb")
ds_all["specific_mb_calendar_cum"].attrs["unit"] = "mm w.e."
ds_control["specific_mb_calendar_cum"].attrs["unit"] = "mm w.e."
# keep only data from the orignal climate file (we need full
# calendar years to work within OGGM)
last_available_floatyear = utils.date_to_floatyear(
y=last_available_date.year,
m=last_available_date.month,
d=last_available_date.day,
)
ds_all = ds_all.sel(time=slice(None, last_available_floatyear))
ds_control = ds_control.sel(time=slice(None, last_available_floatyear))
else:
raise NotImplementedError(f"{datacube_request}")
# calculate quantiles and rename, this is the same for all datacubes
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message="All-NaN slice encountered",
category=RuntimeWarning,
)
ds_all = ds_all.quantile(q=quantiles, dim="sample_name")
ds_all = ds_all.rename({"quantile": "member"})
# convert float quantile values to str, for mixing with 'Control'
# make sure to match ordering
labels = [str(v) for v in ds_all.member.values]
ds_all = ds_all.assign_coords(member=("member", labels))
# add control run as an extra member
ds_all = xr.concat([ds_control, ds_all], dim="member")
ds_all.coords["member"].attrs = {
"description": "Members include Monte Carlo ensemble percentiles "
f"(calculated out of {len(working_samples)} "
"ensemble members) and a Control run calibrated "
"using median input values."
}
# convert placeholder inf values back for snowline
if "snowline" in ds_all:
if np.inf in inf_values:
ds_all["snowline"] = xr.where(
ds_all["snowline"] != inf_values[np.inf],
ds_all["snowline"],
np.inf,
keep_attrs=True,
)
if -np.inf in inf_values:
ds_all["snowline"] = xr.where(
ds_all["snowline"] != inf_values[-np.inf],
ds_all["snowline"],
-np.inf,
keep_attrs=True,
)
# add RGI-ID attribute
ds_all.attrs["RGI-ID"] = self.RGI_ID_attrs
# add calibration strategy
ds_all.attrs["calibration_strategy"] = calibration_strategy
datacube_dict[datacube_request] = ds_all
if show_log:
print(f" Finished generation of {datacube_request} datacube")
# we save the original climate data back to the gdir
with xr.open_dataset(fp_climate.format("_original")) as ds_clim:
ds_clim = ds_clim.load()
ds_clim.to_netcdf(fp_climate.format(""))
os.remove(fp_climate.format("_original"))
if show_log:
print(f" Finished generating datacubes\n")
if not multiprocessing_during_datacube_creation:
cfg.PARAMS["use_multiprocessing"] = old_multiprocessing
return datacube_dict
[docs]
def get_ref_mb(
self,
gdir: GlacierDirectory,
ref_mb_period: str = None,
datacube_l1: xr.Dataset = None,
source: str = "Hugonnet",
) -> tuple:
"""Get observed mass balance for a specific reference period.
Parameters
----------
gdir
datacube_l1: xr.Dataset
If the reference mass balance should be derived from an observation
data, which is available in a L1 datacube.
ref_mb_period: str
Reference calibration period in the format <YYYY-MM-DD_YYYY-MM-DD>.
source: str
Define the observation source
Returns
-------
tuple
ref_mb, ref_mb_unit, ref_mb_err, ref_mb_period: the reference mass
balance to match with unit, associated uncertainty and valid period.
"""
if source == "Hugonnet":
if gdir is None:
gdir = self.gdir
pd_geodetic = utils.get_geodetic_mb_dataframe()
df_ref_mb = pd_geodetic.loc[gdir.rgi_id]
df_ref_mb = df_ref_mb.loc[df_ref_mb.period == ref_mb_period]
ref_mb = df_ref_mb.dmdtda.item() * 1000
ref_mb_unit = "kg m-2 yr-1"
ref_mb_err = df_ref_mb.err_dmdtda.item() * 1000
return ref_mb, ref_mb_unit, ref_mb_err, ref_mb_period
else:
raise NotImplementedError(f"Source {source} is not implemented.")
[docs]
def calibrate(
self,
gdir: GlacierDirectory,
model_matrix: dict = None,
datacube_l1: xr.Dataset = None,
ignore_errors: bool = False,
**kwargs,
) -> dict:
"""Calibrate the mass balance model and create L2 datacubes using a
model matrix.
Parameters
----------
gdir: GlacierDirectory
Glacier directory.
model_matrix: dict
Model parameters for various calibration runs.
datacube_l1: xr.Dataset
If the reference mass balance should be derived from an observation
data, which is available in a L1 datacube.
ignore_errors: bool
If True any exception raised during the calibration is ignored.
kwargs: dict
kwargs passed on to Calibrator.get_calibrated_datacubes
Returns
-------
dict
Containing the created L2 datacubes for each set of configuration
parameters in ``model_matrix``
"""
# Store results
datacubes_l2 = {}
if datacube_l1 is None:
datacube_l1 = self.datacube_l1
if model_matrix is None:
model_matrix = self.model_matrix
for matrix_name, model_params in tqdm(model_matrix.items()):
try:
mb_model_class = model_params["model"]
ref_mb_period = model_params["ref_mb_period"]
source = model_params["source"]
calibration_filesuffix = f"{matrix_name}_{ref_mb_period}"
ref_mb, ref_mb_unit, ref_mb_err, ref_mb_period = self.get_ref_mb(
gdir=gdir,
datacube_l1=datacube_l1,
ref_mb_period=ref_mb_period,
source=source,
)
if source == "Hugonnet":
source_description = "Hugonnet et al. (2021)"
else:
source_description = source
if isinstance(mb_model_class, partial):
mb_model_name = mb_model_class.func.__name__
else:
mb_model_name = mb_model_class.__name__
calibration_strategy = (
f"OGGM mass-balance model '{mb_model_name}' "
f"calibrated to match data from {source_description} in the "
f"period {ref_mb_period}."
)
datacubes_l2[matrix_name] = self.get_calibrated_datacubes(
gdir=gdir,
mb_model_class=mb_model_class,
ref_mb=ref_mb,
ref_mb_err=ref_mb_err,
ref_mb_unit=ref_mb_unit,
ref_mb_period=ref_mb_period,
calibration_filesuffix=calibration_filesuffix,
calibration_strategy=calibration_strategy,
**kwargs,
)
except Exception as e:
if not ignore_errors:
raise
print(f"Calibration for {matrix_name} not successful: {e}")
return datacubes_l2
[docs]
@entity_task(log)
def mcs_mb_calibration_workflow(
gdir,
settings_filesuffix,
settings_parent_filesuffix,
observations_filesuffix,
mb_model_class,
filename,
input_filesuffix,
ref_mb,
ref_mb_period,
ref_mb_err,
ref_mb_unit,
prcp_fac,
melt_f,
temp_bias,
calibration_parameters=None,
):
"""
This defines the calibration workflow executed for each ensemble member
of the Monte Carlo simulation.
Parameters
----------
gdir
settings_filesuffix
settings_parent_filesuffix: str
the settings where from where we use the bounds for the mb
parameters prcp_fac, melt_f and temp_bias
observations_filesuffix
mb_model_class
filename: str
climatic filename
input_filesuffix: str
climatic filenamesuffix
ref_mb
ref_mb_period
ref_mb_err
ref_mb_unit
prcp_fac
melt_f
temp_bias
calibration_parameters: dict
here you can define the order of calibration parameters, the default
is {"calibrate_param1": "prcp_fac", "calibrate_param2": "melt_f",
"calibrate_param3": "temp_bias",}.
Returns
-------
"""
if calibration_parameters is None:
calibration_parameters = {
"calibrate_param1": "prcp_fac",
"calibrate_param2": "melt_f",
"calibrate_param3": "temp_bias",
}
# add observations data to observations file
gdir.observations_filesuffix = observations_filesuffix
gdir.observations["ref_mb"] = {
"err": ref_mb_err,
"period": ref_mb_period,
"value": ref_mb,
"unit": ref_mb_unit,
}
# get bounds from parent
bounds_to_define = [
"prcp_fac_min",
"prcp_fac_max",
"melt_f_min",
"melt_f_max",
"temp_bias_min",
"temp_bias_max",
]
gdir.settings_filesuffix = settings_parent_filesuffix
default_bounds = {}
for bound in bounds_to_define:
default_bounds[bound] = gdir.settings[bound]
# create the new settings file with the correct parent_filesuffix
utils.ModelSettings(
gdir, filesuffix=settings_filesuffix, reset_parent_filesuffix=True
)
gdir.settings_filesuffix = settings_filesuffix
gdir.settings["use_winter_prcp_fac"] = False
# set parent bounds
for bound in bounds_to_define:
gdir.settings[bound] = default_bounds[bound]
# the settings here mimic mb_calibration_from_hugonnet_mb
workflow_output = workflow.execute_entity_task(
massbalance.mb_calibration_from_scalar_mb,
gdir,
settings_filesuffix=settings_filesuffix,
observations_filesuffix=observations_filesuffix,
overwrite_gdir=True,
**calibration_parameters,
prcp_fac=prcp_fac,
melt_f=melt_f,
temp_bias=temp_bias,
mb_model_class=mb_model_class,
filename=filename,
input_filesuffix=input_filesuffix,
return_mb_model=True,
)
# the finally calibrated mass balance model is returned because of
# return_mb_model=True
mb_model = workflow_output[0][1]
return mb_model
[docs]
def extend_to_full_calendar_year(
ds: xr.Dataset, time_dim: str = "time", resolution: str = ""
) -> tuple | xr.Dataset:
"""Extend an xarray Dataset along `time_dim` to the end of the last
calendar year present.
Any newly added timestamps are filled with the last available values
(LOCF).
Parameters
----------
ds : xr.Dataset
time_dim : str, default "time"
resolution : str, optional
- daily: extend to YYYY-12-31 (daily frequency).
- monthly: extend to YYYY-12-01 assuming monthly start
timestamps (MS).
"""
if time_dim not in ds.coords:
raise ValueError(f"Dataset has no coordinate '{time_dim}'.")
t = ds[time_dim].to_index()
if len(t) == 0:
return ds
if not resolution:
# Infer from median step in days. >= ~28 -> monthly, else daily.
dt_days = pd.Series(t).diff().dropna().dt.total_seconds() / 86400.0
med = float(dt_days.median()) if len(dt_days) else np.nan
resolution = "monthly" if (np.isnan(med) or med >= 28) else "daily"
last = t[-1]
year_end = pd.Timestamp(year=last.year, month=12, day=31)
if resolution.lower() == "daily":
new_time = pd.date_range(t[0], year_end, freq="D")
elif resolution.lower() == "monthly":
# Monthly dataset assumed to be on month start (YYYY-MM-01).
# Ends December 1.
end_ms = pd.Timestamp(year=last.year, month=12, day=1)
new_time = pd.date_range(t[0], end_ms, freq="MS")
else:
raise ValueError(
"Accepted resolutions are: 'daily', 'monthly', or leave blank to infer."
)
# Reindex introduces NaNs at new timestamps; ffill carries last observed value forward.
out = ds.reindex({time_dim: new_time}).ffill(time_dim)
return out, last
[docs]
def mask_mixed_year_month(ds: xr.Dataset, last_available: pd.Timestamp) -> xr.Dataset:
"""
Parameters
----------
ds
last_available
Returns
-------
"""
last_year = last_available.year
last_month = last_available.month
if last_month < 12:
year_mask = ds["time"] < last_year # dims: (time,)
else:
year_mask = ds["time"] <= last_year # dims: (time,)
ym_mask = (ds["time"] < last_year) | (
(ds["time"] == last_year) & (ds["month_2d"] <= last_month)
)
# ym_mask dims: (time, month_2d)
out_vars = {}
for name, da in ds.data_vars.items():
dims = set(da.dims)
if "month_2d" in dims and "time" in dims:
out_vars[name] = da.where(ym_mask)
elif "time" in dims and "month_2d" not in dims:
out_vars[name] = da.where(year_mask)
else:
# unchanged for variables not depending on year/month
out_vars[name] = da
return xr.Dataset(out_vars, coords=ds.coords, attrs=ds.attrs)
[docs]
class CalibratorCryotempo(Calibrator):
[docs]
def __init__(self, model_matrix: dict = None, datacube_l1=None, gdir=None):
super().__init__(model_matrix=model_matrix, datacube_l1=datacube_l1, gdir=gdir)
[docs]
def set_model_matrix(
self,
name: str = "SfcType_Cryosat",
model=massbalance.SfcTypeTIModel,
geo_period: str = "2011-01-01_2020-01-01",
daily: bool = True,
source: str = "CryoTEMPO-EOLIS",
**kwargs,
):
"""Set model parameters for calibration.
Parameters
----------
name : str
Name of configuration parameters. Ideally this should match
the settings filesuffix.
model : oggm.core.massbalance.MassBalanceModel
OGGM mass balance model class.
geo_period : str
Reference mass balance period in the format
YYYY-MM-DD_YYYY-MM-DD.
kwargs
Additional arguments passed to the mass balance model
class.
"""
return super().set_model_matrix(
name,
model,
geo_period,
daily=daily,
source=source,
**kwargs,
)
[docs]
def get_eolis_dates(self, ds: xr.Dataset) -> np.ndarray:
"""Get time index from CryoTEMPO-EOLIS dataset.
Parameters
----------
ds : xr.Dataset
CryoTEMPO-EOLIS dataset.
Returns
-------
np.ndarray
Time index adjusted to UTC.
"""
if isinstance(ds.t.values[0], np.datetime64):
out = pd.to_datetime(ds.t.values, utc=True).to_pydatetime()
return np.array([dt.astimezone(tz.tzutc()) for dt in out])
else:
# we assume the type is int
return np.array([datetime.fromtimestamp(t, tz=UTC) for t in ds.t.values])
[docs]
def get_eolis_mean_dh(self, ds: xr.Dataset) -> np.ndarray:
"""Get mean elevation change from EOLIS data.
Parameters
----------
ds : xr.Dataset
CryoTEMPO-EOLIS dataset.
Returns
-------
np.ndarray
Time series of spatial mean of elevation change.
"""
mean_time_series = [
np.nanmean(elevation_change_map.where(ds.glacier_mask == 1))
for elevation_change_map in ds.eolis_gridded_elevation_change
]
return np.array(mean_time_series)
[docs]
def get_temporal_bounds(
self, dates: ArrayLike, date_start: str, date_end: str
) -> tuple:
"""Get start and end dates of geodetic and observational periods.
This matches the nearest available dates for irregular time
indices.
Parameters
----------
dates : array_like
Time index. This may be irregularly spaced.
date_start : str
Start year of a desired period.
date_end : str
End year of a desired period.
Returns
-------
tuple[datetime]
Nearest available start and end dates for observations.
"""
# EOLIS data is in 30-day periods, so get closest available date
data_start = self.get_nearest_datetime(dates, date_start)
data_end = self.get_nearest_datetime(dates, date_end)
return data_start, data_end
[docs]
def get_geodetic_mb_from_dataset(
self,
dataset: xr.Dataset,
date_start="2011-01-01",
date_end="2020-01-01",
) -> tuple:
"""Get the geodetic mass balance in kg m-2 from enhanced gridded data.
Parameters
----------
dataset : xr.Dataset
CryoTEMPO-EOLIS dataset.
date_start : str, default "2011-01-01"
Start of reference period.
date_end : str, default "2020-01-01"
End of reference period.
Returns
-------
tuple
ref_mb, ref_mb_unit, ref_mb_err: the reference mass balance to match
with unit and associated uncertainty.
"""
dates = self.get_eolis_dates(dataset)
data_start, data_end = self.get_temporal_bounds(
dates=dates, date_start=date_start, date_end=date_end
)
ref_mb_period = (
f"{data_start.strftime('%Y-%m-%d')}_" f"{data_end.strftime('%Y-%m-%d')}"
)
calib_frame = pd.DataFrame(
{
"dh": dataset.eolis_elevation_change_timeseries,
"dh_sigma": dataset.eolis_elevation_change_sigma_timeseries,
},
index=dates,
)
# ref_mb in kg m-2, area not needed as we already have a mean dh
# (dh = dV / A)
bulk_density = 850
dh = calib_frame["dh"].loc[data_end] - calib_frame["dh"].loc[data_start]
ref_mb = dh * bulk_density
ref_mb_unit = "kg m-2"
# Error propagation with correlation
relative_dt = relativedelta(data_end, data_start)
k = abs(relative_dt.years * 12 + relative_dt.months)
correlation_coeff = np.maximum(0, 1 - k / 3)
dh_err = np.sqrt(
calib_frame["dh_sigma"].loc[data_start] ** 2
+ calib_frame["dh_sigma"].loc[data_end] ** 2
- 2
* correlation_coeff
* calib_frame["dh_sigma"].loc[data_start]
* calib_frame["dh_sigma"].loc[data_end]
)
ref_mb_err = dh_err * bulk_density
return ref_mb, ref_mb_unit, ref_mb_err, ref_mb_period
[docs]
def get_ref_mb(
self,
gdir: GlacierDirectory = None,
ref_mb_period: str = None,
datacube_l1: xr.Dataset = None,
source: str = "Hugonnet",
) -> tuple:
"""Get observed mass balance for a specific reference period.
Parameters
----------
gdir
datacube_l1: xr.Dataset
If the reference mass balance should be derived from an observation
data, which is available in a L1 datacube.
ref_mb_period: str
Reference calibration period in the format <YYYY-MM-DD_YYYY-MM-DD>.
source: str
Define the observation source
Returns
-------
tuple
ref_mb, ref_mb_unit, ref_mb_err, ref_mb_period: the reference mass
balance to match with unit, associated uncertainty and valid period.
"""
if source == "CryoTEMPO-EOLIS":
dates = ref_mb_period.split("_")
date_start = dates[0]
date_end = dates[1]
if datacube_l1 is None:
datacube_l1 = self.datacube_l1
ref_mb, ref_mb_unit, ref_mb_err, ref_mb_period = (
self.get_geodetic_mb_from_dataset(
dataset=datacube_l1,
date_start=date_start,
date_end=date_end,
)
)
return ref_mb, ref_mb_unit, ref_mb_err, ref_mb_period
else:
return super().get_ref_mb(
gdir=gdir,
datacube_l1=datacube_l1,
ref_mb_period=ref_mb_period,
source=source,
)