Source code for dtcg.api.external.call

"""Copyright 2025 DTCG Contributors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.


=====

Create and read DTCG requests.
"""

from pathlib import Path

import xarray as xr
import zarr
from zarr.errors import GroupNotFoundError

import dtcg.integration.oggm_bindings as oggm_bindings


[docs] class StreamDatacube:
[docs] def __init__(self, server="https://cluster.klima.uni-bremen.de/~dtcg/test_zarr/"): self.binder = oggm_bindings.BindingsCryotempo() self.binder.init_oggm() self.server = server
[docs] def stream_datacube( self, glacier: str, layer: str = "", region_name: str = "Iceland" ) -> xr.DataTree: """Stream datacube from server. Parameters ---------- glacier : str Name or RGI-ID of glacier. layer : str, optional Datacube layer. Default will load all available layers. region : str, default "Iceland" RGI region name. Returns ------- xr.DataTree Datacube or datacube layer. """ if glacier[:3] != "RGI": glacier_data = self.binder.get_rgi_files_from_subregion( region_name=region_name, subregion_name="" ) glacier = self.binder.get_glacier_by_name( glacier_data, glacier ).RGIId.values[0] stream_url = self.get_url(rgi_id=glacier) return xr.open_datatree( stream_url, group=layer, chunks={}, engine="zarr", consolidated=True, decode_cf=True, )
def get_zip_path(self, zip_path: Path, rgi_id: str): if not zip_path.suffix != ".zip": # zarr is a directory Path(zip_path).mkdir(exist_ok=True) zip_path = zip_path.with_suffix(".zip") zip_path = Path(f"{zip_path}/{rgi_id}.zarr.zip") return zip_path
[docs] def zip_datacube(self, rgi_id: str, zip_path: Path = ""): """Download and zip a datacube. Parameters ---------- stream_url : str URL to a zarr folder. rgi_id : str, optional RGI-ID of glacier. zip_path : Path, optional Output path for zip file. Returns ------- Path Path to output zipfile. """ if rgi_id: stream_url = self.get_url(rgi_id=rgi_id) try: zip_path = self.get_zip_path(zip_path=zip_path, rgi_id=rgi_id) if not zip_path.is_file(): store = zarr.storage.ZipStore(zip_path, mode="w") with xr.open_datatree( stream_url, group=None, chunks={}, engine="zarr", consolidated=True, decode_cf=True, ) as stream: stream.compute().to_zarr( store=store, mode="w", consolidated=True, zarr_format=2, # encoding=stream.encoding, ) except GroupNotFoundError as e: # pipe different error to frontends raise FileNotFoundError(f"Error when zipping: {e}") return zip_path
[docs] def get_url(self, rgi_id: str) -> str: """Get URL for a Zarr datacube. Parameters ---------- rgi_id : str Glacier RGI ID. Returns ------- str Server URL of Zarr datacube. """ url = f"{self.server}{rgi_id}.zarr/" return url