Source code for dtcg.api.external.call

"""Copyright 2025 DTCG Contributors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.


=====

Create and read DTCG requests.
"""

from pathlib import Path

import xarray as xr
import zarr
from zarr.errors import GroupNotFoundError

import dtcg.integration.oggm_bindings as oggm_bindings
from dtcg import DEFAULT_L1_DATACUBE_URL


[docs] class StreamDatacube: """Streams data cubes. Attributes ---------- server : str, default DEFAULT_L1_DATACUBE_URL Base URL for data cubes. binder : BindingsOggmModel Handles interaction between user and model. """
[docs] def __init__(self, server=DEFAULT_L1_DATACUBE_URL): self.server = server self.binder = oggm_bindings.BindingsCryotempo() self.binder.init_oggm()
[docs] def stream_datacube( self, glacier: str, layer: str = "", region_name: str = "Iceland" ) -> xr.DataTree: """Stream datacube from server. Parameters ---------- glacier : str Name or RGI-ID of glacier. layer : str, optional Datacube layer. Default will load all available layers. region : str, default "Iceland" RGI region name. Ignored if a valid RGI-ID is passed. Returns ------- xr.DataTree Datacube or datacube layer. """ if glacier[:3] != "RGI": glacier_data = self.binder.get_rgi_files_from_subregion( region_name=region_name, subregion_name="" ) glacier = self.binder.get_glacier_by_name( glacier_data, glacier ).RGIId.values[0] stream_url = self.get_url(rgi_id=glacier) return xr.open_datatree( stream_url, group=layer, chunks={}, engine="zarr", consolidated=True, decode_cf=True, )
def get_zip_path(self, zip_path: Path, rgi_id: str): if not zip_path.suffix != ".zip": # zarr is a directory Path(zip_path).mkdir(exist_ok=True) zip_path = zip_path.with_suffix(".zip") zip_path = Path(f"{zip_path}/{rgi_id}.zarr.zip") return zip_path
[docs] def zip_datacube(self, rgi_id: str, zip_path: Path = ""): """Download and zip a datacube. Parameters ---------- stream_url : str URL to a zarr folder. rgi_id : str, optional RGI-ID of glacier. zip_path : Path, optional Output path for zip file. Returns ------- Path Path to output zipfile. """ if rgi_id: stream_url = self.get_url(rgi_id=rgi_id) try: zip_path = self.get_zip_path(zip_path=zip_path, rgi_id=rgi_id) if not zip_path.is_file(): store = zarr.storage.ZipStore(zip_path, mode="w") with xr.open_datatree( stream_url, group=None, chunks={}, engine="zarr", consolidated=True, decode_cf=True, ) as stream: stream.compute().to_zarr( store=store, mode="w", consolidated=True, zarr_format=2, # encoding=stream.encoding, ) except GroupNotFoundError as e: # pipe different error to frontends raise FileNotFoundError(f"Error when zipping: {e}") return zip_path
[docs] def get_url(self, rgi_id: str) -> str: """Get URL for a Zarr datacube. Parameters ---------- rgi_id : str Glacier RGI ID. Returns ------- str Server URL of Zarr datacube. """ url = f"{self.server}{rgi_id}.zarr/" return url