Source code for dtcg.integration.calibration

"""Copyright 2025 DTCG Contributors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.


=====

Calibrate OGGM models.
"""

import inspect
import logging
import os
import warnings
from datetime import datetime
from functools import partial

import numpy as np
import pandas as pd
import SALib.sample.sobol as sampler
import xarray as xr
from dateutil import tz
from dateutil.relativedelta import relativedelta
from dateutil.tz import UTC
from numpy.typing import ArrayLike
from oggm import GlacierDirectory, cfg, entity_task, tasks, utils, workflow
from oggm.core import massbalance
from tqdm import tqdm

# Module logger
log = logging.getLogger(__name__)


[docs] class Calibrator: """Bindings for calibrating OGGM models. Attributes ---------- model_matrix : dict, default None Matrix of model calibration parameters for different run configurations. datacube_l1 : Any, default None gdir : GlacierDirectory, default None An OGGM glacier directory. """
[docs] def __init__(self, model_matrix: dict = None, datacube_l1=None, gdir=None): if not model_matrix: self.model_matrix = {} else: self.model_matrix = model_matrix self.gdir = gdir self.datacube_l1 = datacube_l1 if datacube_l1 is not None: self.RGI_ID_attrs = datacube_l1.attrs["RGI-ID"] else: self.RGI_ID_attrs = "Not provided"
[docs] def set_model_matrix(self, name: str, model, geo_period: str, **kwargs): """Set model parameters for calibration. Parameters ---------- name : str Name of configuration parameters. Ideally this should match the settings filesuffix. model : oggm.core.massbalance.MassBalanceModel OGGM mass balance model class. geo_period : str Reference mass balance period in the format YYYY-MM-DD_YYYY-MM-DD. kwargs Additional arguments passed to the mass balance model class. """ matrix = { name: { "model": model, "geo_period": geo_period, } } if kwargs: matrix[name].update(kwargs) self.model_matrix.update(matrix)
[docs] def get_nearest_datetime(self, dates: np.ndarray, target_date: str): """Get the nearest datetime. Parameters ---------- dates : array_like A range of dates that can be converted to ``datetime64``. target_date: str Date in the ISO-8601 format "YYYY-MM-DD". Returns ------- """ with warnings.catch_warnings(): warnings.filterwarnings( "ignore", message="no explicit representation of timezones available for np.datetime64", category=UserWarning, ) dates_ns = dates.astype("datetime64[ns]") target_ns = np.datetime64(datetime.strptime(target_date, "%Y-%m-%d"), "ns") diffs = np.abs(dates_ns - target_ns) idx = int(np.argmin(diffs)) return dates[idx]
[docs] def get_mcs_parameter_sample( self, gdir: GlacierDirectory, control_filesuffix: str, nr_samples: int = 2**4, ref_mb_err_sample_interval: float = 1.0, prcp_fac_sample_interval: float = 0.5, temp_bias_sample_interval: float = 0.5, melt_f_sample_interval: float = 0.25, ): """Defines the distribution of the observation and model parameters and create a sample for the Monte Carlo simulation. Parameters ---------- gdir : GlacierDirectory control_filesuffix: str The filesuffix for the control run's settings, which used only the most likely values of the default calibration. nr_samples: int, default 2^4. Number of samples. Must be a multiple of 2. The total number of generated samples equals ``nr_samples * (4 + 2)``, where 4 is the number of variables used for sampling (``ref_mb``, ``prcp_fac``, ``melt_f``, ``temp_bias``). ref_mb_err_sample_interval: float, default 1.0 Multiplication factor applied to ``ref_mb_err``. It assumes ``ref_mb`` has a normal distribution. prcp_fac_sample_interval: float, default 0.5 The width of the bounds which are added and subtracted from the ``prcp_fac`` of the control run. A uniform distribution is used for sampling. temp_bias_sample_interval: float, default 0.5 The width of the bounds which are added and subtracted from the ``temp_bias`` of the control run. A uniform distribution is used for sampling. melt_f_sample_interval: float, default 0.5 The width of the bounds which are added and subtracted from the ``melt_f`` of the control run. A uniform distribution is used for sampling. Returns ------- tuple """ # to have a better overview I provide the parameter distribution # in a more readable way first parameter_distribution = {} # add geodetic mb observation gdir.observations_filesuffix = control_filesuffix ref_mb = gdir.observations["ref_mb"] parameter_distribution["ref_mb"] = { "bounds": [ref_mb["value"], ref_mb["err"] * ref_mb_err_sample_interval], "dists": "norm", } # get default maximum values for mb parameters gdir.settings_filesuffix = "" prcp_fac_min = gdir.settings["prcp_fac_min"] prcp_fac_max = gdir.settings["prcp_fac_max"] melt_f_min = gdir.settings["melt_f_min"] melt_f_max = gdir.settings["melt_f_max"] temp_bias_min = gdir.settings["temp_bias_min"] temp_bias_max = gdir.settings["temp_bias_max"] # add mb parameter distributions gdir.settings_filesuffix = control_filesuffix # prcp_fac prcp_fac_control = gdir.settings["prcp_fac"] prcp_fac_bounds = [ max(prcp_fac_control - prcp_fac_sample_interval, prcp_fac_min), min(prcp_fac_control + prcp_fac_sample_interval, prcp_fac_max), ] gdir.settings["prcp_fac_min"] = prcp_fac_bounds[0] gdir.settings["prcp_fac_max"] = prcp_fac_bounds[1] parameter_distribution["prcp_fac"] = { "bounds": prcp_fac_bounds, "dists": "unif", } # melt_f melt_f_control = gdir.settings["melt_f"] melt_f_bounds = [ max(melt_f_control - melt_f_sample_interval, melt_f_min), min(melt_f_control + melt_f_sample_interval, melt_f_max), ] gdir.settings["melt_f_min"] = melt_f_bounds[0] gdir.settings["melt_f_max"] = melt_f_bounds[1] parameter_distribution["melt_f"] = {"bounds": melt_f_bounds, "dists": "unif"} # temp_bias temp_bias_control = gdir.settings["temp_bias"] temp_bias_bounds = [ max(temp_bias_control - temp_bias_sample_interval, temp_bias_min), min(temp_bias_control + temp_bias_sample_interval, temp_bias_max), ] gdir.settings["temp_bias_min"] = temp_bias_bounds[0] gdir.settings["temp_bias_max"] = temp_bias_bounds[1] parameter_distribution["temp_bias"] = { "bounds": temp_bias_bounds, "dists": "unif", } # now convert to format which is expected by SALib num_vars = 0 problem = { "names": [], "bounds": [], "dists": [], } for key in parameter_distribution: num_vars += 1 problem["names"].append(key) problem["bounds"].append(parameter_distribution[key]["bounds"]) problem["dists"].append(parameter_distribution[key]["dists"]) problem["num_vars"] = num_vars # create a sample of parameters param_values = sampler.sample(problem, nr_samples, calc_second_order=False) return param_values, problem["names"]
[docs] def calculate_and_add_total_runoff(self, ds): """Get total annual and monthly runoff from runoff components. Parameters ---------- ds Returns ------- """ ds["runoff_monthly"] = ( ds["melt_off_glacier_monthly"] + ds["melt_on_glacier_monthly"] + ds["liq_prcp_off_glacier_monthly"] + ds["liq_prcp_on_glacier_monthly"] ) ds.runoff_monthly.attrs = { "unit": "kg month-1", "description": ( "Monthly glacier runoff from sum of monthly melt and liquid " "precipitation on and off the glacier using a fixed-gauge with " "a glacier minimum reference area from year 2000" ), } ds["runoff"] = ( ds["melt_off_glacier"] + ds["melt_on_glacier"] + ds["liq_prcp_off_glacier"] + ds["liq_prcp_on_glacier"] ) ds.runoff.attrs = { "unit": "kg yr-1", "description": ( "Annual glacier runoff: sum of annual melt and liquid " "precipitation on and off the glacier using a fixed-gauge with " "a glacier minimum reference area from year 2000" ), } return ds
[docs] def add_specific_mb(self, ds, mb_model, fls, time_resolution, unit): """Get specific mass-balance in mm w.e. from a volume change time series. Parameters ---------- ds mb_model fls time_resolution unit Returns ------- """ specific_mb = mb_model.get_specific_mb( fls=fls, year=ds.time[:-1].values, time_resolution=time_resolution ) ds = ds.assign( specific_mb=xr.DataArray( specific_mb, dims="time", coords={"time": ds.time[:-1]} ) .reindex(time=ds.time) # adds last coord, fills with NaN .broadcast_like(ds.volume) ) # ds['specific_mb'] = ( # ds.volume.diff(dim='time', label='lower').reindex(time=ds.time) / # ds.area * cfg.PARAMS['ice_density']) return self.add_specific_mb_attrs(ds, unit)
[docs] def add_annual_cumulative( self, ds, var, year_coord="calendar_year", cumulative_suffix="calendar_cumulative", ): """ Parameters ---------- ds var year_coord cumulative_suffix Returns ------- """ out_var = f"{var}_{cumulative_suffix}" # add "calendar_year" if not available if year_coord not in ds.coords: # Derive integer year from float-year time coordinate year = np.floor(ds["time"]).astype(int) # Attach year as coordinate for grouping ds = ds.assign_coords({year_coord: ("time", year.data)}) def _cumsum(x): return x.cumsum(dim="time") annual_cumulative = ds[var].groupby(year_coord).map(_cumsum) # Store back into the same dataset and adapt description ds[out_var] = annual_cumulative attrs = ds[out_var].attrs attrs["description"] = f"cumulated over {year_coord}" ds[out_var].attrs = attrs return ds
[docs] def add_specific_mb_attrs(self, ds, unit): """ Parameters ---------- ds unit Returns ------- """ ds.specific_mb.attrs = {"unit": unit, "description": ("Specific mass-balance")} return ds
[docs] def add_snowline_attrs(self, ds, inf_values=None): """ Parameters ---------- ds inf_values Returns ------- """ ds.snowline.attrs = { "unit": "m", "description": ("Snowline altitude"), "inf_values": str(inf_values), } return ds
[docs] def get_calibrated_datacubes( self, mb_model_class, ref_mb: float, ref_mb_err: float, ref_mb_unit: str = "kg m-2 yr-1", ref_mb_period: str = "", gdir: GlacierDirectory = None, first_guess_settings: str = "", datacube_types: str | list[str] = "monthly", calibration_filesuffix: str = "", calibration_strategy: str = "", mcs_sampling_settings: dict = None, calibration_parameters_control: dict = None, calibration_parameters_mcs: dict = None, quantiles: list = None, climate_input_filesuffix: str = "", show_log: bool = False, multiprocessing_during_datacube_creation: bool = True, **kwargs, ) -> dict: """Calibrate models and generate datacubes. Uses the provided reference mass balance to calibrate models and generate model datacubes. These also include an uncertainty propagation through a Monte Carlo simulation. Parameters ---------- gdir : GlacierDirectory OGGM glacier directory. mb_model_class : oggm.core.massbalance.MassBalanceModel OGGM mass balance model class. ref_mb : float Reference mass balance. ref_mb_err : float Reference mass balance uncertainty. ref_mb_unit : str, default "kg m-2 yr-1" Unit used for the reference mass balance. ref_mb_period : str, optional Measurement period for the reference mass balance. first_guess_settings: str, optional the mb parameters which should be used as a first guess datacube_types: str or list[str], default "monthly" Desired datacube types. Select one or provide a list. Options are: * ``monthly``: monthly volume, area, length and mass balance. * ``annual_hydro``: annual volume, area, length and mass balance, and annual and monthly runoff components. * ``daily_smb``: daily mass balance. calibration_filesuffix: str The filesuffix used for all outputs of this method. calibration_strategy: str Text describing the current calibration strategy, e.g. "OGGM model X calibrated with data from Y over the period Z." mcs_sampling_settings: dict, default None Settings for Monte Carlo simulation parameter sampling. For default values look at signature of ``Calibrator.get_mcs_parameter_sample``. calibration_parameters_control: dict, default None The order of calibration parameters for the control run. If None, defaults to:: { "calibrate_param1": "melt_f", "calibrate_param2": "prcp_fac", "calibrate_param3": "temp_bias", } calibration_parameters_mcs: dict, default None The order of calibration parameters for the Monte Carlo simulation runs. If None, defaults to:: { "calibrate_param1": "prcp_fac", "calibrate_param2": "melt_f", "calibrate_param3": "temp_bias", } quantiles: list, default None The quantiles used for the aggregation of the Monte Carlo simulation results, including labels for the final datacube. If None, defaults to [0.05, 0.15, 0.25, 0.50, 0.75, 0.85, 0.95]. climate_input_filesuffix: str, optional The filesuffix of the climate input file. show_log: bool, default False Show some basic logs during execution if True. kwargs: dict kwargs to pass to the mb_model_class instance. Returns ------- dict The model datacubes generated by using the provided data for calibration, including uncertainties. """ if not calibration_strategy: raise ValueError( "You must provide a description of the currently " "applied calibration strategy in the argument " "'calibration_strategy'. (e.g. OGGM model X " "calibrated with data from Y over the period Z.)" ) if gdir is None: gdir = self.gdir if not calibration_filesuffix: model_name = mb_model_class.__name__ calibration_filesuffix = f"{model_name}_{ref_mb_period}" if not climate_input_filesuffix: # for MySfcTypeTIModel we often use partial climate_input_filesuffix = "_era_monthly" climate_input_filesuffix_daily = "_era_daily" if isinstance(mb_model_class, partial): if mb_model_class.keywords["mb_model_class"].__name__ in [ "DailyTIModel" ]: climate_input_filesuffix = climate_input_filesuffix_daily elif mb_model_class.__name__ in ["SfcTypeTIModel"]: if inspect.signature(mb_model_class.__init__).parameters[ "mb_model_class" ].default.__name__ in ["DailyTIModel"]: climate_input_filesuffix = climate_input_filesuffix_daily elif mb_model_class.__name__ in ["DailyTIModel"]: climate_input_filesuffix = climate_input_filesuffix_daily # we need to extend the climate file to full calendar years to work with # OGGM, at the end we restore the original file again fp_climate = gdir.get_filepath( "climate_historical", filesuffix=climate_input_filesuffix + "{}" ) with xr.open_dataset(fp_climate.format("")) as ds_clim: ds_clim = ds_clim.load() # save the original file with suffix '_original' for later ds_clim.to_netcdf(fp_climate.format("_original")) ds_clim, last_available_date = extend_to_full_calendar_year(ds_clim) ds_clim.to_netcdf(fp_climate.format("")) # check if we are working with SfcTypeTIModel for snowline save_snowline = False if isinstance(mb_model_class, partial): if mb_model_class.func.__name__ in ["SfcTypeTIModel"]: save_snowline = True elif mb_model_class.__name__ in ["SfcTypeTIModel"]: save_snowline = True # we apply the kwargs to the mb_model class here, to be sure all # subsequent tasks use them correctly if kwargs: mb_model_class = partial(mb_model_class, **kwargs) if quantiles is None: quantiles = [0.05, 0.15, 0.25, 0.50, 0.75, 0.85, 0.95] if calibration_parameters_control is None: calibration_parameters_control = { "calibrate_param1": "melt_f", "calibrate_param2": "prcp_fac", "calibrate_param3": "temp_bias", } # define what should be the default settings we use as first guess for # control calibration gdir.settings_filesuffix = first_guess_settings prcp_fac_fg = gdir.settings["prcp_fac"] melt_f_fg = gdir.settings["melt_f"] temp_bias_fg = gdir.settings["temp_bias"] # conduct a control calibration, this is the approach without MCS control_filesuffix = f"_{calibration_filesuffix}_control" workflow_output = workflow.execute_entity_task( tasks.mb_calibration_from_scalar_mb, gdir, settings_filesuffix=control_filesuffix, observations_filesuffix=control_filesuffix, ref_mb=ref_mb, ref_mb_unit=ref_mb_unit, ref_mb_err=ref_mb_err, ref_mb_period=ref_mb_period, overwrite_gdir=True, **calibration_parameters_control, prcp_fac=prcp_fac_fg, melt_f=melt_f_fg, temp_bias=temp_bias_fg, mb_model_class=mb_model_class, filename="climate_historical", input_filesuffix=climate_input_filesuffix, return_mb_model=True, ) # the finally calibrated mass balance model is returned because of # return_mb_model=True mb_model_control = workflow_output[0][1] # now sample parameters of control calibration if mcs_sampling_settings is None: mcs_sampling_settings = {} elif not isinstance(mcs_sampling_settings, dict): raise TypeError("MCS sampling settings must be a dictionary.") parameter_sample, parameter_names = self.get_mcs_parameter_sample( gdir=gdir, control_filesuffix=control_filesuffix, **mcs_sampling_settings ) # now conduct MCS by execute calibration for each sample gdir.observations_filesuffix = control_filesuffix ref_mb_control = gdir.observations["ref_mb"] if show_log: print( f"{calibration_filesuffix}:\n" f" Starting Monte Carlo Simulation for " f"{parameter_sample.shape[0]} ensemble members." ) # prepare the settings for individual mcs runs for multiprocessing, # define here kwargs which are individual for each run all_mcs_runs = [] all_param_filesuffix = [] for i, param_val in enumerate(parameter_sample): param_filesuffix = f"_{calibration_filesuffix}_{i}" all_param_filesuffix.append(param_filesuffix) all_mcs_runs.append( ( gdir, dict( settings_filesuffix=param_filesuffix, observations_filesuffix=param_filesuffix, ref_mb=param_val[parameter_names.index("ref_mb")], prcp_fac=param_val[parameter_names.index("prcp_fac")], melt_f=param_val[parameter_names.index("melt_f")], temp_bias=param_val[parameter_names.index("temp_bias")], ), ) ) # execute mcs runs with execute_entity_task for multiprocessing, # kwargs defined below are the same for all runs old_continue_one_error = cfg.PARAMS["continue_on_error"] cfg.PARAMS["continue_on_error"] = True mcs_out = workflow.execute_entity_task( mcs_mb_calibration_workflow, all_mcs_runs, settings_parent_filesuffix=control_filesuffix, mb_model_class=mb_model_class, filename="climate_historical", input_filesuffix=climate_input_filesuffix, ref_mb_period=ref_mb_control["period"], ref_mb_err=ref_mb_control["err"], ref_mb_unit=ref_mb_control["unit"], calibration_parameters=calibration_parameters_mcs, ) cfg.PARAMS["continue_on_error"] = old_continue_one_error if not multiprocessing_during_datacube_creation: # this is for testing a bug on the cluster old_multiprocessing = cfg.PARAMS["use_multiprocessing"] cfg.PARAMS["use_multiprocessing"] = False # here we keep only working mcs runs and their mb_models working_samples = [] mb_model_samples = [] for i, out in enumerate(mcs_out): if out is None: continue working_samples.append(all_param_filesuffix[i]) mb_model_samples.append(out) if show_log: print( f" Finished Monte Carlo Simulation: {len(working_samples)} " f"ensemble members for output aggregation available.\n" f" Start generating datacubes" ) datacube_dict = {} if isinstance(datacube_types, str): datacube_types = [datacube_types] for datacube_request in datacube_types: if show_log: print(f" Start generation of {datacube_request} datacube") if datacube_request in ["annual_hydro", "monthly"]: # define kwargs which are individual for each run all_sample_runs = [] for sample_filesuffix in working_samples + [control_filesuffix]: all_sample_runs.append( ( gdir, dict( settings_filesuffix=sample_filesuffix, output_filesuffix=f"{sample_filesuffix}_{datacube_request}", ), ) ) if datacube_request == "annual_hydro": datacube_vars = [ "volume", "area", "length", "off_area", "on_area", "melt_off_glacier", "melt_on_glacier", "liq_prcp_off_glacier", "liq_prcp_on_glacier", "snowfall_off_glacier", "snowfall_on_glacier", "melt_off_glacier_monthly", "melt_on_glacier_monthly", "liq_prcp_off_glacier_monthly", "liq_prcp_on_glacier_monthly", "snowfall_off_glacier_monthly", "snowfall_on_glacier_monthly", "runoff_monthly", "runoff", "specific_mb", "runoff_monthly_cumulative", ] # run dynamic model with different calibration options # kwargs which are the same for all runs, can be provided directly workflow.execute_entity_task( tasks.run_with_hydro, all_sample_runs, run_task=tasks.run_from_climate_data, fixed_geometry_spinup_yr=2000, ref_area_yr=2000, ys=2000, store_monthly_hydro=True, mb_model_class=mb_model_class, climate_filename="climate_historical", climate_input_filesuffix=climate_input_filesuffix, ) elif datacube_request == "monthly": datacube_vars = [ "volume", "area", "length", "specific_mb", "specific_mb_calendar_cum", ] # run dynamic model with different calibration options # kwargs which are the same for all runs, can be provided directly workflow.execute_entity_task( tasks.run_from_climate_data, all_sample_runs, fixed_geometry_spinup_yr=2000, ys=2000, store_monthly_step=True, mb_elev_feedback="monthly", mb_model_class=mb_model_class, climate_filename="climate_historical", climate_input_filesuffix=climate_input_filesuffix, ) # create a xarray datacube including all ensemble members if datacube_request in ["annual_hydro", "monthly"]: # compile run outputs and merge ds_all = [] # open each ensemble member and add a new coordinate for later # merging for sample_filesuffix, mb_model in zip( working_samples + [control_filesuffix], mb_model_samples + [mb_model_control], ): ds_tmp = utils.compile_run_output( gdir, input_filesuffix=f"{sample_filesuffix}_{datacube_request}" ) if datacube_request == "annual_hydro": ds_tmp = self.calculate_and_add_total_runoff(ds_tmp) cum_var = "runoff_monthly_cumulative" var = "runoff_monthly" ds_tmp[cum_var] = ds_tmp[var].cumsum(dim="month_2d") ds_tmp[cum_var].attrs[ "description" ] = f"Cumulative {ds_tmp[var].attrs['description']}" ds_tmp[cum_var].attrs["unit"] = "kg" mb_unit = "mm w.e. yr-1" time_resolution = "annual" else: mb_unit = "mm w.e. month-1" time_resolution = "monthly" fls = gdir.read_pickle("inversion_flowlines") ds_tmp = self.add_specific_mb( ds_tmp, mb_model=mb_model, fls=fls, time_resolution=time_resolution, unit=mb_unit, ) if datacube_request == "monthly": # add cumulative ouput ds_tmp = self.add_annual_cumulative(ds_tmp, var="specific_mb") ds_tmp["specific_mb_calendar_cum"].attrs["unit"] = "mm w.e." ds_tmp = ds_tmp[datacube_vars] # keep only data from the orignal climate file (we need full # calendar years to work within OGGM) if datacube_request == "monthly": ds_tmp = ds_tmp.sel( time=slice( None, utils.date_to_floatyear( y=last_available_date.year, m=last_available_date.month, ), ) ) elif datacube_request == "annual_hydro": ds_tmp = mask_mixed_year_month( ds=ds_tmp, last_available=last_available_date ) else: raise NotImplementedError(f"{datacube_request}") # add resulting dataset for merging if sample_filesuffix == control_filesuffix: ds_control = ds_tmp.expand_dims(member=["Control"]) ds_tmp.coords["sample_name"] = sample_filesuffix ds_tmp = ds_tmp.expand_dims("sample_name") ds_all.append(ds_tmp) # merge all ensemble members into a single dataset ds_all = xr.combine_by_coords( ds_all, fill_value=np.nan, combine_attrs="override" ) elif datacube_request == "daily_smb": # define daily float year timeseries y1 = int(mb_model_control.ye) mb_years = utils.float_years_timeseries( y0=2000, y1=y1, include_last_year=True, daily=True ) # loop through all ensemble members and get specific mb smb_all = [] if save_snowline: snowline_all = [] fls = gdir.read_pickle("inversion_flowlines") for sample_filesuffix, mb_model in zip( working_samples + [control_filesuffix], mb_model_samples + [mb_model_control], ): smb_tmp = mb_model.get_specific_mb( fls=fls, year=mb_years, time_resolution="daily" ) if save_snowline: # use isclose to avoid rounding errors snowline_yr = np.isclose( mb_model.snowline_year[:, None], mb_years, rtol=1e-10 ).any(axis=1) assert snowline_yr.sum() == len(mb_years) snowline_tmp = mb_model.snowline[snowline_yr] # set inf values for quantile computation inf_values = mb_model.snowline_inf_values if np.inf in inf_values: snowline_tmp = np.where( np.isposinf(snowline_tmp), inf_values[np.inf], snowline_tmp, ) if -np.inf in inf_values: snowline_tmp = np.where( np.isneginf(snowline_tmp), inf_values[-np.inf], snowline_tmp, ) if sample_filesuffix == control_filesuffix: smb_control = smb_tmp if save_snowline: snowline_control = snowline_tmp smb_all.append(smb_tmp) if save_snowline: snowline_all.append(snowline_tmp) # combine all ensemble members in a dataset for later quantile # computation ds_all = xr.Dataset( data_vars=dict( specific_mb=(["sample_name", "time"], np.array(smb_all)), ), coords=dict( sample_name=working_samples + [control_filesuffix], time=mb_years, ), ) ds_all["time"].attrs = {"description": "Floating year"} ds_all = self.add_specific_mb_attrs(ds_all, unit="mm w.e. day-1") if save_snowline: ds_all["snowline"] = ( ("sample_name", "time"), np.array(snowline_all), ) ds_all = self.add_snowline_attrs(ds_all, inf_values=inf_values) ds_control = xr.Dataset( data_vars=dict(specific_mb=(["member", "time"], [smb_control])), coords=dict(member=["Control"], time=mb_years), ) ds_control["time"].attrs = {"description": "Floating year"} ds_control = self.add_specific_mb_attrs( ds_control, unit="mm w.e. day-1" ) if save_snowline: ds_control["snowline"] = (("member", "time"), [snowline_control]) ds_control = self.add_snowline_attrs( ds_control, inf_values=inf_values ) # add cumulative ouput ds_all = self.add_annual_cumulative(ds_all, var="specific_mb") ds_control = self.add_annual_cumulative(ds_control, var="specific_mb") ds_all["specific_mb_calendar_cum"].attrs["unit"] = "mm w.e." ds_control["specific_mb_calendar_cum"].attrs["unit"] = "mm w.e." # keep only data from the orignal climate file (we need full # calendar years to work within OGGM) last_available_floatyear = utils.date_to_floatyear( y=last_available_date.year, m=last_available_date.month, d=last_available_date.day, ) ds_all = ds_all.sel(time=slice(None, last_available_floatyear)) ds_control = ds_control.sel(time=slice(None, last_available_floatyear)) else: raise NotImplementedError(f"{datacube_request}") # calculate quantiles and rename, this is the same for all datacubes with warnings.catch_warnings(): warnings.filterwarnings( "ignore", message="All-NaN slice encountered", category=RuntimeWarning, ) ds_all = ds_all.quantile(q=quantiles, dim="sample_name") ds_all = ds_all.rename({"quantile": "member"}) # convert float quantile values to str, for mixing with 'Control' # make sure to match ordering labels = [str(v) for v in ds_all.member.values] ds_all = ds_all.assign_coords(member=("member", labels)) # add control run as an extra member ds_all = xr.concat([ds_control, ds_all], dim="member") ds_all.coords["member"].attrs = { "description": "Members include Monte Carlo ensemble percentiles " f"(calculated out of {len(working_samples)} " "ensemble members) and a Control run calibrated " "using median input values." } # convert placeholder inf values back for snowline if "snowline" in ds_all: if np.inf in inf_values: ds_all["snowline"] = xr.where( ds_all["snowline"] != inf_values[np.inf], ds_all["snowline"], np.inf, keep_attrs=True, ) if -np.inf in inf_values: ds_all["snowline"] = xr.where( ds_all["snowline"] != inf_values[-np.inf], ds_all["snowline"], -np.inf, keep_attrs=True, ) # add RGI-ID attribute ds_all.attrs["RGI-ID"] = self.RGI_ID_attrs # add calibration strategy ds_all.attrs["calibration_strategy"] = calibration_strategy datacube_dict[datacube_request] = ds_all if show_log: print(f" Finished generation of {datacube_request} datacube") # we save the original climate data back to the gdir with xr.open_dataset(fp_climate.format("_original")) as ds_clim: ds_clim = ds_clim.load() ds_clim.to_netcdf(fp_climate.format("")) os.remove(fp_climate.format("_original")) if show_log: print(f" Finished generating datacubes\n") if not multiprocessing_during_datacube_creation: cfg.PARAMS["use_multiprocessing"] = old_multiprocessing return datacube_dict
[docs] def get_ref_mb( self, gdir: GlacierDirectory, ref_mb_period: str = None, datacube_l1: xr.Dataset = None, source: str = "Hugonnet", ) -> tuple: """Get observed mass balance for a specific reference period. Parameters ---------- gdir datacube_l1: xr.Dataset If the reference mass balance should be derived from an observation data, which is available in a L1 datacube. ref_mb_period: str Reference calibration period in the format <YYYY-MM-DD_YYYY-MM-DD>. source: str Define the observation source Returns ------- tuple ref_mb, ref_mb_unit, ref_mb_err, ref_mb_period: the reference mass balance to match with unit, associated uncertainty and valid period. """ if source == "Hugonnet": if gdir is None: gdir = self.gdir pd_geodetic = utils.get_geodetic_mb_dataframe() df_ref_mb = pd_geodetic.loc[gdir.rgi_id] df_ref_mb = df_ref_mb.loc[df_ref_mb.period == ref_mb_period] ref_mb = df_ref_mb.dmdtda.item() * 1000 ref_mb_unit = "kg m-2 yr-1" ref_mb_err = df_ref_mb.err_dmdtda.item() * 1000 return ref_mb, ref_mb_unit, ref_mb_err, ref_mb_period else: raise NotImplementedError(f"Source {source} is not implemented.")
[docs] def calibrate( self, gdir: GlacierDirectory, model_matrix: dict = None, datacube_l1: xr.Dataset = None, ignore_errors: bool = False, **kwargs, ) -> dict: """Calibrate the mass balance model and create L2 datacubes using a model matrix. Parameters ---------- gdir: GlacierDirectory Glacier directory. model_matrix: dict Model parameters for various calibration runs. datacube_l1: xr.Dataset If the reference mass balance should be derived from an observation data, which is available in a L1 datacube. ignore_errors: bool If True any exception raised during the calibration is ignored. kwargs: dict kwargs passed on to Calibrator.get_calibrated_datacubes Returns ------- dict Containing the created L2 datacubes for each set of configuration parameters in ``model_matrix`` """ # Store results datacubes_l2 = {} if datacube_l1 is None: datacube_l1 = self.datacube_l1 if model_matrix is None: model_matrix = self.model_matrix for matrix_name, model_params in tqdm(model_matrix.items()): try: mb_model_class = model_params["model"] ref_mb_period = model_params["ref_mb_period"] source = model_params["source"] calibration_filesuffix = f"{matrix_name}_{ref_mb_period}" ref_mb, ref_mb_unit, ref_mb_err, ref_mb_period = self.get_ref_mb( gdir=gdir, datacube_l1=datacube_l1, ref_mb_period=ref_mb_period, source=source, ) if source == "Hugonnet": source_description = "Hugonnet et al. (2021)" else: source_description = source if isinstance(mb_model_class, partial): mb_model_name = mb_model_class.func.__name__ else: mb_model_name = mb_model_class.__name__ calibration_strategy = ( f"OGGM mass-balance model '{mb_model_name}' " f"calibrated to match data from {source_description} in the " f"period {ref_mb_period}." ) datacubes_l2[matrix_name] = self.get_calibrated_datacubes( gdir=gdir, mb_model_class=mb_model_class, ref_mb=ref_mb, ref_mb_err=ref_mb_err, ref_mb_unit=ref_mb_unit, ref_mb_period=ref_mb_period, calibration_filesuffix=calibration_filesuffix, calibration_strategy=calibration_strategy, **kwargs, ) except Exception as e: if not ignore_errors: raise print(f"Calibration for {matrix_name} not successful: {e}") return datacubes_l2
[docs] @entity_task(log) def mcs_mb_calibration_workflow( gdir, settings_filesuffix, settings_parent_filesuffix, observations_filesuffix, mb_model_class, filename, input_filesuffix, ref_mb, ref_mb_period, ref_mb_err, ref_mb_unit, prcp_fac, melt_f, temp_bias, calibration_parameters=None, ): """ This defines the calibration workflow executed for each ensemble member of the Monte Carlo simulation. Parameters ---------- gdir settings_filesuffix settings_parent_filesuffix: str the settings where from where we use the bounds for the mb parameters prcp_fac, melt_f and temp_bias observations_filesuffix mb_model_class filename: str climatic filename input_filesuffix: str climatic filenamesuffix ref_mb ref_mb_period ref_mb_err ref_mb_unit prcp_fac melt_f temp_bias calibration_parameters: dict here you can define the order of calibration parameters, the default is {"calibrate_param1": "prcp_fac", "calibrate_param2": "melt_f", "calibrate_param3": "temp_bias",}. Returns ------- """ if calibration_parameters is None: calibration_parameters = { "calibrate_param1": "prcp_fac", "calibrate_param2": "melt_f", "calibrate_param3": "temp_bias", } # add observations data to observations file gdir.observations_filesuffix = observations_filesuffix gdir.observations["ref_mb"] = { "err": ref_mb_err, "period": ref_mb_period, "value": ref_mb, "unit": ref_mb_unit, } # get bounds from parent bounds_to_define = [ "prcp_fac_min", "prcp_fac_max", "melt_f_min", "melt_f_max", "temp_bias_min", "temp_bias_max", ] gdir.settings_filesuffix = settings_parent_filesuffix default_bounds = {} for bound in bounds_to_define: default_bounds[bound] = gdir.settings[bound] # create the new settings file with the correct parent_filesuffix utils.ModelSettings( gdir, filesuffix=settings_filesuffix, reset_parent_filesuffix=True ) gdir.settings_filesuffix = settings_filesuffix gdir.settings["use_winter_prcp_fac"] = False # set parent bounds for bound in bounds_to_define: gdir.settings[bound] = default_bounds[bound] # the settings here mimic mb_calibration_from_hugonnet_mb workflow_output = workflow.execute_entity_task( massbalance.mb_calibration_from_scalar_mb, gdir, settings_filesuffix=settings_filesuffix, observations_filesuffix=observations_filesuffix, overwrite_gdir=True, **calibration_parameters, prcp_fac=prcp_fac, melt_f=melt_f, temp_bias=temp_bias, mb_model_class=mb_model_class, filename=filename, input_filesuffix=input_filesuffix, return_mb_model=True, ) # the finally calibrated mass balance model is returned because of # return_mb_model=True mb_model = workflow_output[0][1] return mb_model
[docs] def extend_to_full_calendar_year( ds: xr.Dataset, time_dim: str = "time", resolution: str = "" ) -> tuple | xr.Dataset: """Extend an xarray Dataset along `time_dim` to the end of the last calendar year present. Any newly added timestamps are filled with the last available values (LOCF). Parameters ---------- ds : xr.Dataset time_dim : str, default "time" resolution : str, optional - daily: extend to YYYY-12-31 (daily frequency). - monthly: extend to YYYY-12-01 assuming monthly start timestamps (MS). """ if time_dim not in ds.coords: raise ValueError(f"Dataset has no coordinate '{time_dim}'.") t = ds[time_dim].to_index() if len(t) == 0: return ds if not resolution: # Infer from median step in days. >= ~28 -> monthly, else daily. dt_days = pd.Series(t).diff().dropna().dt.total_seconds() / 86400.0 med = float(dt_days.median()) if len(dt_days) else np.nan resolution = "monthly" if (np.isnan(med) or med >= 28) else "daily" last = t[-1] year_end = pd.Timestamp(year=last.year, month=12, day=31) if resolution.lower() == "daily": new_time = pd.date_range(t[0], year_end, freq="D") elif resolution.lower() == "monthly": # Monthly dataset assumed to be on month start (YYYY-MM-01). # Ends December 1. end_ms = pd.Timestamp(year=last.year, month=12, day=1) new_time = pd.date_range(t[0], end_ms, freq="MS") else: raise ValueError( "Accepted resolutions are: 'daily', 'monthly', or leave blank to infer." ) # Reindex introduces NaNs at new timestamps; ffill carries last observed value forward. out = ds.reindex({time_dim: new_time}).ffill(time_dim) return out, last
[docs] def mask_mixed_year_month(ds: xr.Dataset, last_available: pd.Timestamp) -> xr.Dataset: """ Parameters ---------- ds last_available Returns ------- """ last_year = last_available.year last_month = last_available.month if last_month < 12: year_mask = ds["time"] < last_year # dims: (time,) else: year_mask = ds["time"] <= last_year # dims: (time,) ym_mask = (ds["time"] < last_year) | ( (ds["time"] == last_year) & (ds["month_2d"] <= last_month) ) # ym_mask dims: (time, month_2d) out_vars = {} for name, da in ds.data_vars.items(): dims = set(da.dims) if "month_2d" in dims and "time" in dims: out_vars[name] = da.where(ym_mask) elif "time" in dims and "month_2d" not in dims: out_vars[name] = da.where(year_mask) else: # unchanged for variables not depending on year/month out_vars[name] = da return xr.Dataset(out_vars, coords=ds.coords, attrs=ds.attrs)
[docs] class CalibratorCryotempo(Calibrator):
[docs] def __init__(self, model_matrix: dict = None, datacube_l1=None, gdir=None): super().__init__(model_matrix=model_matrix, datacube_l1=datacube_l1, gdir=gdir)
[docs] def set_model_matrix( self, name: str = "SfcType_Cryosat", model=massbalance.SfcTypeTIModel, geo_period: str = "2011-01-01_2020-01-01", daily: bool = True, source: str = "CryoTEMPO-EOLIS", **kwargs, ): """Set model parameters for calibration. Parameters ---------- name : str Name of configuration parameters. Ideally this should match the settings filesuffix. model : oggm.core.massbalance.MassBalanceModel OGGM mass balance model class. geo_period : str Reference mass balance period in the format YYYY-MM-DD_YYYY-MM-DD. kwargs Additional arguments passed to the mass balance model class. """ return super().set_model_matrix( name, model, geo_period, daily=daily, source=source, **kwargs, )
[docs] def get_eolis_dates(self, ds: xr.Dataset) -> np.ndarray: """Get time index from CryoTEMPO-EOLIS dataset. Parameters ---------- ds : xr.Dataset CryoTEMPO-EOLIS dataset. Returns ------- np.ndarray Time index adjusted to UTC. """ if isinstance(ds.t.values[0], np.datetime64): out = pd.to_datetime(ds.t.values, utc=True).to_pydatetime() return np.array([dt.astimezone(tz.tzutc()) for dt in out]) else: # we assume the type is int return np.array([datetime.fromtimestamp(t, tz=UTC) for t in ds.t.values])
[docs] def get_eolis_mean_dh(self, ds: xr.Dataset) -> np.ndarray: """Get mean elevation change from EOLIS data. Parameters ---------- ds : xr.Dataset CryoTEMPO-EOLIS dataset. Returns ------- np.ndarray Time series of spatial mean of elevation change. """ mean_time_series = [ np.nanmean(elevation_change_map.where(ds.glacier_mask == 1)) for elevation_change_map in ds.eolis_gridded_elevation_change ] return np.array(mean_time_series)
[docs] def get_temporal_bounds( self, dates: ArrayLike, date_start: str, date_end: str ) -> tuple: """Get start and end dates of geodetic and observational periods. This matches the nearest available dates for irregular time indices. Parameters ---------- dates : array_like Time index. This may be irregularly spaced. date_start : str Start year of a desired period. date_end : str End year of a desired period. Returns ------- tuple[datetime] Nearest available start and end dates for observations. """ # EOLIS data is in 30-day periods, so get closest available date data_start = self.get_nearest_datetime(dates, date_start) data_end = self.get_nearest_datetime(dates, date_end) return data_start, data_end
[docs] def get_geodetic_mb_from_dataset( self, dataset: xr.Dataset, date_start="2011-01-01", date_end="2020-01-01", ) -> tuple: """Get the geodetic mass balance in kg m-2 from enhanced gridded data. Parameters ---------- dataset : xr.Dataset CryoTEMPO-EOLIS dataset. date_start : str, default "2011-01-01" Start of reference period. date_end : str, default "2020-01-01" End of reference period. Returns ------- tuple ref_mb, ref_mb_unit, ref_mb_err: the reference mass balance to match with unit and associated uncertainty. """ dates = self.get_eolis_dates(dataset) data_start, data_end = self.get_temporal_bounds( dates=dates, date_start=date_start, date_end=date_end ) ref_mb_period = ( f"{data_start.strftime('%Y-%m-%d')}_" f"{data_end.strftime('%Y-%m-%d')}" ) calib_frame = pd.DataFrame( { "dh": dataset.eolis_elevation_change_timeseries, "dh_sigma": dataset.eolis_elevation_change_sigma_timeseries, }, index=dates, ) # ref_mb in kg m-2, area not needed as we already have a mean dh # (dh = dV / A) bulk_density = 850 dh = calib_frame["dh"].loc[data_end] - calib_frame["dh"].loc[data_start] ref_mb = dh * bulk_density ref_mb_unit = "kg m-2" # Error propagation with correlation relative_dt = relativedelta(data_end, data_start) k = abs(relative_dt.years * 12 + relative_dt.months) correlation_coeff = np.maximum(0, 1 - k / 3) dh_err = np.sqrt( calib_frame["dh_sigma"].loc[data_start] ** 2 + calib_frame["dh_sigma"].loc[data_end] ** 2 - 2 * correlation_coeff * calib_frame["dh_sigma"].loc[data_start] * calib_frame["dh_sigma"].loc[data_end] ) ref_mb_err = dh_err * bulk_density return ref_mb, ref_mb_unit, ref_mb_err, ref_mb_period
[docs] def get_ref_mb( self, gdir: GlacierDirectory = None, ref_mb_period: str = None, datacube_l1: xr.Dataset = None, source: str = "Hugonnet", ) -> tuple: """Get observed mass balance for a specific reference period. Parameters ---------- gdir datacube_l1: xr.Dataset If the reference mass balance should be derived from an observation data, which is available in a L1 datacube. ref_mb_period: str Reference calibration period in the format <YYYY-MM-DD_YYYY-MM-DD>. source: str Define the observation source Returns ------- tuple ref_mb, ref_mb_unit, ref_mb_err, ref_mb_period: the reference mass balance to match with unit, associated uncertainty and valid period. """ if source == "CryoTEMPO-EOLIS": dates = ref_mb_period.split("_") date_start = dates[0] date_end = dates[1] if datacube_l1 is None: datacube_l1 = self.datacube_l1 ref_mb, ref_mb_unit, ref_mb_err, ref_mb_period = ( self.get_geodetic_mb_from_dataset( dataset=datacube_l1, date_start=date_start, date_end=date_end, ) ) return ref_mb, ref_mb_unit, ref_mb_err, ref_mb_period else: return super().get_ref_mb( gdir=gdir, datacube_l1=datacube_l1, ref_mb_period=ref_mb_period, source=source, )