Skip to content

API Reference

climdata.utils.wrapper_workflow.ClimData

Climate data extraction and extreme-index workflow manager.

Provides a high-level API for: - loading/configuring dataset providers via Hydra config - uploading NetCDF/CSV content into xarray Datasets - extracting data from supported providers (CMIP, DWD, MSWX, HYRAS, POWER) - computing extreme indices using configured xclim indices - converting datasets to long-form DataFrames and saving results

Attributes:

Name Type Description
cfg DictConfig

Hydra configuration object describing dataset, region/time/variables, outputs.

current_ds xr.Dataset

The most recently loaded or extracted dataset.

current_df pd.DataFrame

The most recently produced long-form DataFrame.

filename_csv/filename_nc/filename_zarr str

Generated output filename templates/paths.

Examples:

extractor = ClimateExtractor(overrides=['dataset=cmip', 'region=europe']) extractor.extract() idx_ds = extractor.calc_index() df = extractor.to_dataframe(idx_ds) extractor.to_csv(df)

Source code in climdata/utils/wrapper_workflow.py
class ClimateExtractor:
    """Climate data extraction and extreme-index workflow manager.

    Provides a high-level API for:
        - loading/configuring dataset providers via Hydra config
        - uploading NetCDF/CSV content into xarray Datasets
        - extracting data from supported providers (CMIP, DWD, MSWX, HYRAS, POWER)
        - computing extreme indices using configured xclim indices
        - converting datasets to long-form DataFrames and saving results

    Attributes:
        cfg (DictConfig): Hydra configuration object describing dataset, region/time/variables, outputs.
        current_ds (xr.Dataset): The most recently loaded or extracted dataset.
        current_df (pd.DataFrame): The most recently produced long-form DataFrame.
        filename_csv/filename_nc/filename_zarr (str): Generated output filename templates/paths.

    Example:
        extractor = ClimateExtractor(overrides=['dataset=cmip', 'region=europe'])
        extractor.extract()
        idx_ds = extractor.calc_index()
        df = extractor.to_dataframe(idx_ds)
        extractor.to_csv(df)
    """

    def __init__(self, cfg_name="config", conf_path=None, overrides: Optional[List[str]] = None):
        """Initialize the workflow manager and load configuration.

        Args:
            cfg_name (str): Name of the Hydra configuration (default: "config").
            conf_path (str, optional): Optional config path override.
            overrides (list[str], optional): Hydra overrides to apply to the configuration.
        """
        self.cfg_name = cfg_name
        self.conf_path = conf_path
        self.cfg: Optional[DictConfig] = None

        # Stage datasets
        self.ds = None
        self.current_ds = None
        self.index_ds = None
        self.impute_ds = None
        self.bias_corrected_ds = None

        # Stage DataFrames
        self.raw_df = None
        self.current_df = None
        self.index_df = None
        self.impute_df = None
        self.bias_corrected_df = None
        self.df = None  # alias for current_df

        # filenames
        self.filename = None
        self.filetype = None

        # Automatically load config on init
        self.load_config(overrides)
        self.cfg = self.preprocess_aoi(self.cfg)

        # instance logger for this extractor
        self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
    def _gen_fn(self, *, ds: xr.Dataset = None, df: pd.DataFrame = None):
        """Create filenames (csv, nc, zarr) using config templates and dataset metadata.

        Accepts either:
        - ds : xarray.Dataset
        - df : pandas.DataFrame (long form with columns: lat, lon, time/date, variable, value, optional source)

        Exactly one must be provided (keyword-only).
        """

        # ------------------------
        # Validate inputs
        # ------------------------
        if (ds is None) == (df is None):
            raise ValueError("Provide exactly one of `ds` or `df` as a keyword argument.")

        # ------------------------
        # Helper: find coord alias (xarray)
        # ------------------------
        def find_coord(ds, names):
            for name in names:
                if name in ds.coords:
                    return ds[name]
            return None

        # ------------------------
        # Case 1: xarray.Dataset
        # ------------------------
        if ds is not None:
            lat = find_coord(ds, ["lat", "latitude"])
            lon = find_coord(ds, ["lon", "longitude"])
            time = find_coord(ds, ["time", "date"])

            provider = ds.attrs.get("source", "unknown")
            vars_list = list(ds.data_vars)
            parameter = vars_list[0] if len(vars_list) == 1 else "_".join(vars_list)

            # Latitude range
            if lat is not None:
                lat_vals = lat.values.reshape(-1)
                lat_min, lat_max = float(lat_vals.min()), float(lat_vals.max())
            else:
                lat_min = lat_max = None

            # Longitude range
            if lon is not None:
                lon_vals = lon.values.reshape(-1)
                lon_min, lon_max = float(lon_vals.min()), float(lon_vals.max())
            else:
                lon_min = lon_max = None

            # Time range
            if time is not None:
                tvals = pd.to_datetime(time.values)
                start, end = tvals.min().strftime("%Y-%m-%d"), tvals.max().strftime("%Y-%m-%d")
            else:
                start = end = "unknown"

        # ------------------------
        # Case 2: pandas.DataFrame (long form)
        # ------------------------
        else:
            cols = df.columns.astype(str)

            # Identify coordinate columns
            lat_cols = [c for c in cols if "lat" in c.lower()]
            lon_cols = [c for c in cols if "lon" in c.lower()]
            time_cols = [c for c in cols if "time" in c.lower() or "date" in c.lower()]

            # Provider from 'source' column
            if "source" in df.columns:
                unique_sources = df["source"].dropna().unique()
                provider = unique_sources[0] if len(unique_sources) == 1 else "_".join(map(str, unique_sources))
            else:
                provider = "unknown"

            # Unique parameters from 'variable' column
            if "variable" in df.columns:
                unique_parameters = sorted(df["variable"].dropna().unique())
                parameter = unique_parameters[0] if len(unique_parameters) == 1 else "_".join(unique_parameters)
            else:
                parameter = "unknown"

            # Latitude range
            if lat_cols:
                lat_vals = pd.to_numeric(df[lat_cols[0]], errors="coerce")
                lat_min, lat_max = float(lat_vals.min()), float(lat_vals.max())
            else:
                lat_min = lat_max = None

            # Longitude range
            if lon_cols:
                lon_vals = pd.to_numeric(df[lon_cols[0]], errors="coerce")
                lon_min, lon_max = float(lon_vals.min()), float(lon_vals.max())
            else:
                lon_min = lon_max = None

            # Time range
            if time_cols:
                tvals = pd.to_datetime(df[time_cols[0]], errors="coerce")
                start = tvals.min().strftime("%Y-%m-%d")
                end = tvals.max().strftime("%Y-%m-%d")
            else:
                start = end = "unknown"

        # ------------------------
        # Format lat/lon strings
        # ------------------------
        if lat_min is None:
            lat_str = lat_range = "unknown"
        elif lat_min == lat_max:
            lat_str = lat_range = f"{lat_min}"
        else:
            lat_str = f"{lat_min}_{lat_max}"
            lat_range = f"{lat_min}-{lat_max}"

        if lon_min is None:
            lon_str = lon_range = "unknown"
        elif lon_min == lon_max:
            lon_str = lon_range = f"{lon_min}"
        else:
            lon_str = f"{lon_min}_{lon_max}"
            lon_range = f"{lon_min}-{lon_max}"

        # ------------------------
        # Build filenames
        # ------------------------
        outdir = Path(self.cfg.output.out_dir)
        outdir.mkdir(parents=True, exist_ok=True)
        def build(fn_template):
            return fn_template.format(
                provider=provider,
                parameter=parameter,
                lat=lat_str,
                lon=lon_str,
                start=start,
                end=end,
                lat_range=lat_range,
                lon_range=lon_range,
            )

        self.filename_csv = str(outdir / build(self.cfg.output.filename_csv))
        self.filename_nc = str(outdir / build(self.cfg.output.filename_nc))
        self.filename_zarr = str(outdir / build(self.cfg.output.filename_zarr))
        return self
    def _gen_fn_cfg(self):
        """Generate output filenames using configuration and extracted dataset metadata.

        Uses settings from ``self.cfg`` and ``self.current_ds`` (if available) to build filename templates.
        """

        cfg = self.cfg
        out = cfg.output
        provider = cfg.dataset.lower()
        if self.current_ds:
            if len(self.current_ds.data_vars) == 0:
                parameter = "unknown"
            elif len(self.current_ds.data_vars) == 1:
                parameter = next(iter(self.current_ds.data_vars))
            else:
                parameter = "_".join(self.current_ds.data_vars)
        else:
            parameter = "_".join(self.cfg.variables)
        # --------------------------------
        # Determine lat/lon values
        # --------------------------------
        if cfg.lat is not None and cfg.lon is not None:
            lat_range = lon_range = None   # single point
            lat_str = str(cfg.lat)
            lon_str = str(cfg.lon)
        elif cfg.region is not None:
            b = cfg.bounds[cfg.region]
            lat_min, lat_max = b["lat_min"], b["lat_max"]
            lon_min, lon_max = b["lon_min"], b["lon_max"]

            lat_str = f"{lat_min}_{lat_max}"
            lon_str = f"{lon_min}_{lon_max}"
            lat_range = f"{lat_min}-{lat_max}"
            lon_range = f"{lon_min}-{lon_max}"
        else:
            # Fallback for undefined regions
            lat_str = "unknown"
            lon_str = "unknown"
            lat_range = None
            lon_range = None

        # --------------------------------
        # Time range from cfg
        # --------------------------------
        start = pd.to_datetime(cfg.time_range.start_date).strftime("%Y-%m-%d")
        end = pd.to_datetime(cfg.time_range.end_date).strftime("%Y-%m-%d")

        # --------------------------------
        # Format filenames
        # --------------------------------
        def format_template(template):
            return template.format(
                provider=provider,
                parameter=parameter,
                lat=lat_str,
                lon=lon_str,
                lat_or_lat_range=lat_range or lat_str,
                lon_or_lon_range=lon_range or lon_str,
                start=start,
                end=end,
            )

        out_dir = Path(self.cfg.output.out_dir)
        out_dir.mkdir(parents=True, exist_ok=True)

        self.filename_csv = str(out_dir / format_template(out.filename_csv))
        self.filename_nc = str(out_dir / format_template(out.filename_nc))
        self.filename_zarr = str(out_dir / format_template(out.filename_zarr))

    # ----------------------------
    # Hydra config
    # ----------------------------
    def load_config(self, overrides: Optional[List[str]] = None) -> DictConfig:
        """Load and compose the Hydra configuration.

        Args:
            overrides (list[str], optional): Hydra overrides to apply when composing the configuration.

        Returns:
            DictConfig: Composed Hydra configuration object stored on ``self.cfg``.
        """
        overrides = overrides or []
        conf_dir = _ensure_local_conf()
        rel_conf_dir = os.path.relpath(conf_dir, os.path.dirname(__file__))

        if not GlobalHydra.instance().is_initialized():
            hydra_ctx = initialize(config_path=rel_conf_dir, version_base=None)
        else:
            hydra_ctx = None

        if hydra_ctx:
            with hydra_ctx:
                self.cfg = compose(config_name=self.cfg_name, overrides=overrides)
        else:
            self.cfg = compose(config_name=self.cfg_name, overrides=overrides)
        return self.cfg

    # ----------------------------
    # AOI preprocessing
    # ----------------------------
    def preprocess_aoi(self, cfg: DictConfig) -> DictConfig:
        """Process an 'aoi' specification in the configuration.

        Supports GeoJSON strings or dictionaries for FeatureCollection, Feature, or simple geometry objects (Point/Polygon).

        Args:
            cfg (DictConfig): Configuration object with optional ``aoi`` entry.

        Returns:
            DictConfig: The modified configuration. When a Point is provided, ``cfg.lat`` and ``cfg.lon`` are set; when a Polygon is provided, ``cfg.bounds`` is set and ``cfg.region`` is set to "custom".
        """
        if not hasattr(cfg, "aoi") or cfg.aoi is None:
            return cfg

        if isinstance(cfg.aoi, str):
            try:
                cfg.aoi = json.loads(cfg.aoi)
            except json.JSONDecodeError:
                raise ValueError("Invalid AOI JSON string")

        aoi = cfg.aoi

        if aoi.get("type") == "FeatureCollection":
            geom = shape(aoi["features"][0]["geometry"])
        elif aoi.get("type") == "Feature":
            geom = shape(aoi["geometry"])
        elif "type" in aoi:
            geom = shape(aoi)
        else:
            raise ValueError(f"Unsupported AOI format: {aoi}")

        if isinstance(geom, Point):
            cfg.lat = geom.y
            cfg.lon = geom.x
            cfg.bounds = None
        elif isinstance(geom, Polygon):
            minx, miny, maxx, maxy = geom.bounds
            cfg.bounds = {"custom": {"lat_min": miny, "lat_max": maxy,
                                     "lon_min": minx, "lon_max": maxx}}
            cfg.region = "custom"
            cfg.lat = None
            cfg.lon = None
        else:
            raise ValueError(f"Unknown geometry type {geom.geom_type}")

        return cfg

    # ----------------------------
    # Upload NetCDF
    # ----------------------------
    @update_ds(attr_name='ds')
    def upload_netcdf(self, nc_file: str) -> xr.Dataset:
        """Load a NetCDF file into an xarray.Dataset and update file metadata.

        Args:
            nc_file (str): Path to the NetCDF file to open.

        Returns:
            xr.Dataset: The loaded dataset (also sets ``self.current_ds``).
        """
        if not os.path.exists(nc_file):
            raise FileNotFoundError(f"{nc_file} does not exist")

        ds = xr.open_dataset(nc_file)

        # Update cfg variables & varinfo
        if not hasattr(self.cfg, "variables") or not self.cfg.variables:
            self.cfg.variables = list(ds.data_vars)
        if not hasattr(self.cfg, "varinfo") or not self.cfg.varinfo:
            self.cfg.varinfo = {v: {"units": ds[v].attrs.get("units", "unknown")}
                                for v in ds.data_vars}
        self._gen_fn(ds)
        return ds

    # ----------------------------
    # Upload CSV → xarray.Dataset
    # ----------------------------
    @update_ds(attr_name='ds')
    def upload_csv(self, csv_file: str) -> xr.Dataset:
        """Load a long-form CSV into an xarray.Dataset.

        The CSV must contain ``time`` and ``lat``/``latitude``, ``lon``/``longitude``, ``variable``, ``value``. Units may be supplied in a ``units`` column and an optional ``source`` column is recognized.

        Args:
            csv_file (str): Path to the CSV file to load.

        Returns:
            xr.Dataset: The converted dataset (also sets ``self.current_ds``).
        """
        if not os.path.exists(csv_file):
            raise FileNotFoundError(f"{csv_file} does not exist")

        df = pd.read_csv(csv_file, parse_dates=["time"])

        lat_col = next((c for c in ["lat", "latitude"] if c in df.columns), None)
        lon_col = next((c for c in ["lon", "longitude"] if c in df.columns), None)
        if lat_col is None or lon_col is None:
            raise ValueError("CSV must have 'lat'/'latitude' and 'lon'/'longitude' columns")

        id_vars = ["time", lat_col, lon_col]
        df_wide = df.pivot_table(index=id_vars, columns="variable", values="value").reset_index()
        ds = df_wide.set_index(id_vars).to_xarray()

        # Attach units from CSV
        for var in ds.data_vars:
            units_series = df[df["variable"] == var]["units"]
            ds[var].attrs["units"] = units_series.iloc[0] if not units_series.empty else "unknown"

        # Global source attribute
        if "source" in df.columns:
            source_series = df["source"].dropna().unique()
            if len(source_series) > 0:
                ds.attrs["source"] = source_series[0]

        # Update cfg variables & varinfo
        if not hasattr(self.cfg, "variables") or not self.cfg.variables:
            self.cfg.variables = list(ds.data_vars)
        if not hasattr(self.cfg, "varinfo") or not self.cfg.varinfo:
            self.cfg.varinfo = {v: {"units": ds[v].attrs.get("units", "unknown")} for v in ds.data_vars}
        self._gen_fn(ds)
        return ds

    # ----------------------------
    # Extract data from datasets like CMIP, DWD, etc.
    # ----------------------------
    @update_ds(attr_name='ds')
    def extract(self) -> xr.Dataset:
        """Extract data from the configured provider using ``self.cfg``.

        Uses provider-specific classes (e.g., ``CMIP``, ``DWD``, ``MSWX``, ``HYRAS``, ``POWER``)
        to fetch, load and extract datasets. When extraction completes, units are converted to those declared in ``cfg.varinfo``, the dataset is computed, and filenames are generated from the configuration.

        Returns:
            xr.Dataset: The extracted and computed dataset (also sets ``self.current_ds``).
        """
        cfg = self.cfg
        extract_kwargs = {}

        if cfg.lat is not None and cfg.lon is not None:
            extract_kwargs["point"] = (cfg.lon, cfg.lat)
            if cfg.dataset.upper() == "DWD":
                extract_kwargs["buffer_km"] = 30
        elif cfg.region is not None:
            extract_kwargs["box"] = cfg.bounds[cfg.region]
        elif cfg.shapefile is not None:
            extract_kwargs["shapefile"] = cfg.shapefile

        ds = None
        dataset_upper = cfg.dataset.upper()

        if dataset_upper == "MSWX":
            ds_vars = []
            for var in cfg.variables:
                mswx = climdata.MSWX(cfg)
                mswx.extract(**extract_kwargs)
                mswx.load(var)
                ds_vars.append(mswx.dataset)
            ds = xr.merge(ds_vars)
            self.dataset_class = mswx
        elif dataset_upper == "CMIP":
            cmip = climdata.CMIP(cfg)
            cmip.fetch()
            cmip.load()
            cmip.extract(**extract_kwargs)
            ds = cmip.ds
            self.dataset_class = cmip
        elif dataset_upper == "POWER":
            power = climdata.POWER(cfg)
            power.fetch()
            power.load()
            ds = power.ds
            self.dataset_class = power
        elif dataset_upper == "DWD":
            ds_vars = []
            for var in cfg.variables:
                dwd = climdata.DWD(cfg)
                ds_var = dwd.extract(variable=var, **extract_kwargs)
                ds_vars.append(ds_var)
            ds = xr.merge(ds_vars)
            self.dataset_class = dwd
        elif dataset_upper == "HYRAS":
            hyras = climdata.HYRAS(cfg)
            ds_vars = []
            for var in cfg.variables:
                hyras.extract(**extract_kwargs)
                ds_vars.append(hyras.load(var, chunking={'time':"auto"})[[var]])
            ds = xr.merge(ds_vars, compat="override")
            self.dataset_class = hyras
        elif dataset_upper == "HOSTRADA":
            hostrada = climdata.HOSTRADA(cfg)
            hostrada.extract(**extract_kwargs)
            ds_vars = []
            for var in cfg.variables:
                ds_vars.append(hostrada.load(var, chunking={'time': 'auto'})[[var]])
            ds = xr.merge(ds_vars, compat="override")
            self.dataset_class = hostrada
        elif dataset_upper == "W5E5":
            w5e5 = climdata.W5E5(cfg)
            w5e5.fetch()  # Download from ISIMIP
            w5e5.load()   # Load into xarray
            w5e5.extract(**extract_kwargs)
            ds = w5e5.ds
            self.dataset_class = w5e5
        elif dataset_upper == "CMIP_W5E5":
            cmip_w5e5 = climdata.CMIPW5E5(cfg)
            cmip_w5e5.fetch()  # Download CMIP6 data from ISIMIP
            cmip_w5e5.load()   # Load into xarray
            cmip_w5e5.extract(**extract_kwargs)
            ds = cmip_w5e5.ds
            self.dataset_class = cmip_w5e5
        elif dataset_upper == "NEXGDDP":
            nexgddp = climdata.NEXGDDP(cfg)
            nexgddp.fetch()  # Download NEX-GDDP-CMIP6 data from NASA THREDDS
            nexgddp.load()   # Load into xarray
            nexgddp.extract(**extract_kwargs)
            ds = nexgddp.ds
            self.dataset_class = nexgddp
        elif dataset_upper == "AGRI_ISIMIP":
            agri = climdata.AGRI_ISIMIP(cfg)
            agri.fetch()  # Download agricultural data from ISIMIP
            ds_vars = []
            for var in cfg.variables:
                ds_vars.append(agri.load(var, chunking={'time': 'auto'})[[var]])
            ds = xr.merge(ds_vars, compat="override")
            agri.extract(**extract_kwargs)
            self.dataset_class = agri
        for var in ds.data_vars:
            ds[var] = xclim.core.units.convert_units_to(ds[var], cfg.varinfo[var].units)

        # ds = ds.compute()

        return ds
    # ----------------------------
    # Compute extreme index
    # ----------------------------
    @update_ds(attr_name='index_ds')
    def calc_index(self, ds: xr.Dataset = None) -> xr.Dataset:
        """Calculate the configured extreme index using xclim indices.

        Args:
            ds (xr.Dataset, optional): Dataset to operate on. If ``None``, ``self.current_ds`` is used.

        Returns:
            xr.Dataset: The computed index as an xarray Dataset (also sets ``self.index_ds``).
        """
        cfg = self.cfg

        # Use provided ds or fallback
        ds = ds or self.current_ds
        if ds is None:
            raise ValueError("No dataset provided and no current_ds is available.")

        if cfg.index is None:
            self.logger.info("No index selected.")
            return None

        if "time" in ds.coords:
            # Handle both numpy datetime64 and cftime datetime objects
            time_values = ds.time.values
            try:
                # Try pandas conversion first (works for datetime64)
                years = pd.to_datetime(time_values).year
            except (TypeError, ValueError):
                # Fall back to cftime handling
                years = np.array([t.year for t in time_values])

            n_years = len(pd.unique(years))
            if n_years < 30:
                warnings.warn(f"Index {cfg.index} usually requires ≥30 years, got {n_years}", UserWarning)

        indices = extreme_index(cfg, ds)
        index_ds = indices.calculate(cfg.index).compute()
        index_ds = index_ds.to_dataset(name=cfg.index)

        return index_ds
    # ----------------------------
    # Dataset → Long-form DataFrame
    # ----------------------------
    @update_df()
    def to_dataframe(self, ds: xr.Dataset = None) -> pd.DataFrame:
        """Convert a dataset to a long-form pandas DataFrame.

        The output contains columns: time, lat, lon (or latitude/longitude), variable, value, units, source.

        Args:
            ds (xr.Dataset, optional): Dataset to convert. If ``None``, uses ``self.current_ds``.

        Returns:
            pd.DataFrame: Long-form DataFrame (also sets ``self.current_df``).
        """
        ds = ds or self.current_ds
        if ds is None:
            raise ValueError("No dataset provided and no current_ds is available.")

        df = ds.to_dataframe().reset_index()

        id_vars = [c for c in ("time", "lat", "lon", "latitude", "longitude") if c in df]
        value_vars = [v for v in ds.data_vars if v in df.columns]

        if not value_vars:
            raise ValueError("No variables in dataset available to melt into long format")

        df_long = df.melt(
            id_vars=id_vars,
            value_vars=value_vars,
            var_name="variable",
            value_name="value"
        )

        df_long["units"] = df_long["variable"].apply(
            lambda v: ds[v].attrs.get("units", "unknown")
        )
        if getattr(self.cfg, "dataset") == 'cmip':
            df_long["source_id"] = getattr(self.cfg, "source_id")
        df_long["source"] = getattr(self.cfg, "dataset", ds.attrs.get("source", "unknown"))
        df_long = df_long.drop_duplicates()
        self._gen_fn_cfg()
        return df_long

    # ----------------------------
    # Save CSV
    # ----------------------------
    def to_csv(self, df: Optional[pd.DataFrame] = None, filename: Optional[str] = None, format: str = "default") -> str:
        """Save a DataFrame to CSV with optional format specification.

        Args:
            df (pd.DataFrame, optional): DataFrame to save. Defaults to ``self.current_df``.
            filename (str, optional): Output filename/directory. Defaults to ``self.filename_csv``.
            format (str): Output format. Options:
                - 'default': Long-form (single file)
                - 'simplace': SIMPLACE format (splits by location, tab-separated)
                - 'monica': MONICA format (splits by location, tab-separated)
                Defaults to 'default'.

        Returns:
            str: The path of the written CSV file(s). For SIMPLACE/MONICA, returns base directory.

        Raises:
            ValueError: If format is not supported or required data is missing.
        """
        df = df if df is not None else self.current_df
        format_lower = format.lower()

        # Apply format conversion if requested
        if format_lower in ("simplace", "monica"):
            return self._convert_to_gridded_format(df, filename, format=format_lower)
        elif format_lower == "default":
            filename = filename or getattr(self, "filename_csv", None)
            if filename is None:
                raise ValueError("No filename provided and filename_csv is not set")

            path = Path(filename)
            path.parent.mkdir(parents=True, exist_ok=True)

            df.to_csv(filename, index=False, sep='\t')
            self.filename_csv = str(path)
            self.current_filename = str(path)

            self.logger.info(f"DataFrame saved to CSV file ({format} format): {self.current_filename}")

            return filename
        else:
            raise ValueError(f"Unsupported format: {format}. Supported formats: 'default', 'simplace', 'monica'")

    def _convert_to_gridded_format(self, df: pd.DataFrame, filename: Optional[str] = None, format: str = "simplace") -> str:
        """Convert long-form DataFrame to gridded SIMPLACE/MONICA format by row/column indices.

        Creates a folder structure: <base_dir>/col_<col_number>/<format>_variables_row_<r>_col_<c>.csv

        Args:
            df (pd.DataFrame): Long-form DataFrame with columns: date/time, lat/x, lon/y, variable, value
            filename (str, optional): Base output directory. If None, creates 'simplace_output' or 'monica_output'
            format (str): Format type - 'simplace' or 'monica'

        Returns:
            str: Base directory path where all files were created

        Raises:
            ValueError: If DataFrame format is incompatible or required columns missing.
        """
        # Validate input DataFrame structure
        time_col = None
        for col in df.columns:
            if col.lower() in ('date', 'time'):
                time_col = col
                break
        if time_col is None:
            raise ValueError("DataFrame must contain 'date' or 'time' column")

        var_col = None
        for col in df.columns:
            if col.lower() == 'variable':
                var_col = col
                break
        if var_col is None:
            raise ValueError("DataFrame must contain 'variable' column")

        val_col = None
        for col in df.columns:
            if col.lower() in ('value', 'data'):
                val_col = col
                break
        if val_col is None:
            raise ValueError("DataFrame must contain 'value' or 'data' column")

        # Handle both lat/lon and x/y dimensions
        lat_col = next((c for c in df.columns if c.lower() in ('lat', 'latitude', 'y')), None)
        lon_col = next((c for c in df.columns if c.lower() in ('lon', 'longitude', 'x')), None)

        if lat_col is None or lon_col is None:
            raise ValueError("DataFrame must contain 'lat'/'latitude'/'y' and 'lon'/'longitude'/'x' columns")

        # Get dimensions from xarray Dataset if available
        if hasattr(self, 'ds') and self.ds is not None:
            ds = self.ds
            # Determine dimension names (handle lat/lon or y/x)
            lat_dim = next((d for d in ds.dims if d.lower() in ('lat', 'latitude', 'y')), None)
            lon_dim = next((d for d in ds.dims if d.lower() in ('lon', 'longitude', 'x')), None)

            if lat_dim and lon_dim:
                n_rows = ds.sizes[lat_dim]
                n_cols = ds.sizes[lon_dim]
                self.logger.info(f"Dataset dimensions: {n_rows} rows (lat/y) × {n_cols} columns (lon/x)")
            else:
                n_rows = df[lat_col].nunique()
                n_cols = df[lon_col].nunique()
                self.logger.info(f"Inferred dimensions: {n_rows} rows × {n_cols} columns")
        else:
            n_rows = df[lat_col].nunique()
            n_cols = df[lon_col].nunique()
            self.logger.info(f"DataFrame dimensions: {n_rows} rows × {n_cols} columns")

        # Create base output directory
        if filename is None:
            filename = f"{format}_{self.cfg.dataset}"
        base_dir = Path(filename)
        base_dir.mkdir(parents=True, exist_ok=True)

        # Get unique lat/lon pairs and assign row/column indices
        unique_locations = df.drop_duplicates(subset=[lat_col, lon_col]).sort_values([lat_col, lon_col]).reset_index(drop=True)

        # Create mapping from lat/lon to row/col indices
        lat_sorted = sorted(df[lat_col].unique(), reverse=True)  # rows decrease from north to south
        lon_sorted = sorted(df[lon_col].unique())  # columns increase from west to east

        lat_to_row = {lat: i for i, lat in enumerate(lat_sorted)}
        lon_to_col = {lon: j for j, lon in enumerate(lon_sorted)}

        n_locations = len(unique_locations)
        self.logger.info(f"Converting to {format.upper()} format with {n_locations} unique locations")

        created_files = []

        # Process each location
        for idx, (_, loc_row) in enumerate(unique_locations.iterrows(), 1):
            lat = loc_row[lat_col]
            lon = loc_row[lon_col]

            row_num = lat_to_row[lat]
            col_num = lon_to_col[lon]

            # Filter data for this location
            df_loc = df[(df[lat_col] == lat) & (df[lon_col] == lon)].copy()

            # Pivot: time x variables
            loc_df = df_loc.pivot_table(
                index=time_col,
                columns=var_col,
                values=val_col,
                aggfunc='first'
            ).reset_index()

            # Rename Date column
            loc_df.rename(columns={time_col: 'Date'}, inplace=True)

            # Rename variables from CF names to DWD/standard names
            rename_mapping = {}
            for col in loc_df.columns:
                if col in CF_TO_DWD_NAMES:
                    rename_mapping[col] = CF_TO_DWD_NAMES[col]
            loc_df.rename(columns=rename_mapping, inplace=True)

            # Generate filename
            variables_list = df_loc[var_col].unique()
            variables_named = [CF_TO_DWD_NAMES.get(v, v) for v in variables_list]
            variables_str = "_".join(variables_named)

            # Get dataset name from config
            dataset_name = getattr(self.cfg, 'dataset', 'unknown')

            # Create column-based folder: col_<col_number> (no zero-padding)
            col_folder = base_dir / f"{col_num}"
            col_folder.mkdir(parents=True, exist_ok=True)

            # Save file: <format>_<variables>_<dataset>.csv
            output_filename = col_folder / f"{dataset_name.upper()}_Daily_C{col_num}R{row_num}.csv"
            loc_df.to_csv(output_filename, index=False, sep='\t')

            created_files.append(str(output_filename))
            self.logger.debug(f"[{idx}/{n_locations}] Saved col_{col_num} (row {row_num}) -> {output_filename}")

        self.logger.info(f"Successfully created {len(created_files)} {format.upper()} format files in {base_dir}")
        self.logger.info(f"Grid dimensions: {n_rows} rows × {n_cols} columns")

        self.filename_csv = str(base_dir)
        self.current_filename = str(base_dir)

        return str(base_dir)

    def _convert_to_dwd_format(self, df: pd.DataFrame) -> tuple:
        """Convert long-form DataFrame to DWD single-station tabular format.

        DWD format characteristics:
        - One row per date (time series)
        - Columns: Date, Variables (Precipitation, TempMin, TempMean, TempMax, etc.)
        - Single location (gridcell identifier from source)
        - Tab-separated values
        - Filename: dwd_<n_lat>x<n_lon>_<variables>_<gridcell_id>.csv

        Args:
            df (pd.DataFrame): Long-form DataFrame with columns: date/time, lat, lon, variable, value

        Returns:
            tuple: (converted_df, suggested_filename)

        Raises:
            ValueError: If DataFrame format is incompatible or has multiple stations.
        """
        # Validate input DataFrame structure
        required_cols = {'date', 'time', 'variable', 'value'}
        df_cols_lower = {c.lower() for c in df.columns}

        # Check for time column
        time_col = None
        for col in df.columns:
            if col.lower() in ('date', 'time'):
                time_col = col
                break
        if time_col is None:
            raise ValueError("DataFrame must contain 'date' or 'time' column")

        # Check for variable column
        var_col = None
        for col in df.columns:
            if col.lower() == 'variable':
                var_col = col
                break
        if var_col is None:
            raise ValueError("DataFrame must contain 'variable' column")

        # Check for value column
        val_col = None
        for col in df.columns:
            if col.lower() in ('value', 'data'):
                val_col = col
                break
        if val_col is None:
            raise ValueError("DataFrame must contain 'value' or 'data' column")

        # Get location info (lat, lon, gridcell)
        lat_col = next((c for c in df.columns if c.lower() in ('lat', 'latitude')), None)
        lon_col = next((c for c in df.columns if c.lower() in ('lon', 'longitude')), None)

        # Get unique stations/locations
        location_cols = [c for c in df.columns if c.lower() in ('lat', 'lon', 'latitude', 'longitude', 'gridcell', 'station_id', 'station')]
        if location_cols:
            n_locations = len(df.groupby(location_cols, sort=False))
            if n_locations > 1:
                raise ValueError(f"DWD format supports single station only, but found {n_locations} locations")

        # Pivot: time x variables
        dwd_df = df.pivot_table(
            index=time_col,
            columns=var_col,
            values=val_col,
            aggfunc='first'
        ).reset_index()

        # Rename Date column to match DWD standard
        dwd_df.rename(columns={time_col: 'Date'}, inplace=True)

        # Rename variables from CF names to DWD names
        rename_mapping = {}
        for col in dwd_df.columns:
            if col in CF_TO_DWD_NAMES:
                rename_mapping[col] = CF_TO_DWD_NAMES[col]
        dwd_df.rename(columns=rename_mapping, inplace=True)

        # Add gridcell identifier if available (from source data)
        if 'gridcell' in df.columns:
            gridcell = df['gridcell'].iloc[0]
        elif 'station_id' in df.columns:
            gridcell = df['station_id'].iloc[0]
        else:
            gridcell = "unknown"

        # Generate filename: dwd_<n_lat>x<n_lon>_<variables>_<gridcell_id>.csv
        n_lat = df[lat_col].nunique() if lat_col else 1
        n_lon = df[lon_col].nunique() if lon_col else 1

        # Use DWD names in filename for consistency
        variables_list = df[var_col].unique()
        variables_dwd = [CF_TO_DWD_NAMES.get(v, v) for v in variables_list]
        variables = "_".join(variables_dwd)

        # Clean gridcell identifier for filename (replace : with _)
        gridcell_safe = str(gridcell).replace(":", "_").replace("/", "_").replace(" ", "_")

        filename = f"dwd_{n_lat}x{n_lon}_{variables}_{gridcell_safe}.csv"

        self.logger.debug(f"Converted to DWD format: {n_lat}x{n_lon} grid, {dwd_df.shape[0]} time steps, {dwd_df.shape[1]-1} variables; renamed variables to DWD names: {rename_mapping}")

        return dwd_df, filename

    def to_nc(self, ds: Optional[xr.Dataset] = None, filename: Optional[str] = None) -> str:
        """Save an xarray Dataset to NetCDF.

        Notes:
            - If ``ds`` is ``None``: save ``current_ds``.
            - If ``filename`` is ``None``: use ``self.filename_nc``.
            - Creates directories if needed and updates ``self.filename_nc`` and ``self.current_filename``.

        Args:
            ds (xr.Dataset, optional): Dataset to save. If ``None``, uses ``self.current_ds``.
            filename (str, optional): Output filename. Defaults to ``self.filename_nc``.

        Returns:
            str: The path of the written NetCDF file.
        """

        # -------------------------------
        # 1. Determine dataset to save
        # -------------------------------
        ds = ds or getattr(self, "current_ds", None)
        if ds is None:
            raise ValueError("No dataset available to save")

        # -------------------------------
        # 2. Determine filename
        # -------------------------------
        filename = filename or getattr(self, "filename_nc", None)
        if filename is None:
            raise ValueError("No filename provided and filename_nc is not set")

        path = Path(filename)
        path.parent.mkdir(parents=True, exist_ok=True)

        # -------------------------------
        # 3. Save to NetCDF
        # -------------------------------
        ds.to_netcdf(path)

        # -------------------------------
        # 4. Track filenames
        # -------------------------------
        self.filename_nc = str(path)
        self.current_filename = str(path)

        # print(f"Dataset saved to NetCDF file: {self.current_filename}")
        self.logger.info(f"Dataset saved to NetCDF file: {self.current_filename}")

        return str(path)

    # ----------------------------
    # Unified workflow
    # ----------------------------
    def run_workflow(self, overrides: Optional[List[str]] = None,
                     actions: Optional[List[str]] = None,
                     file: Optional[str] = None) -> WorkflowResult:
        """Execute a sequence of workflow actions.

        Args:
            overrides (list[str], optional): Hydra overrides to apply (not all actions will use these).
            actions (list[str], optional): Ordered list of actions to perform. Supported actions include: 'upload_netcdf', 'upload_csv', 'extract', 'calc_index', 'to_dataframe', 'to_csv', 'to_nc'.
            file (str, optional): File path used for upload actions when required.

        Returns:
            WorkflowResult: Named result container with populated fields for dataframe/dataset/filenames.
        """
        actions = actions or ["extract", "calc_index", "to_csv", "to_nc"]
        result = WorkflowResult(cfg=self.cfg)
        for action in actions:
            self.logger.info("Starting action: %s", action)
            try:
                if action == "upload_netcdf":
                    if file is None:
                        raise ValueError(
                            "Action 'upload_netcdf' requires argument 'netcdf_file', "
                            "but none was provided."
                        )
                        # Validate extension
                    valid_nc_ext = (".nc", ".nc4", ".nc.gz")
                    if not any(str(file).lower().endswith(ext) for ext in valid_nc_ext):
                        raise ValueError(
                            f"Invalid file format for upload_netcdf: '{file}'. "
                            f"Expected one of: {valid_nc_ext}"
                        )
                    self.upload_netcdf(file)
                    result.dataset = self.current_ds

                elif action == "upload_csv":
                    if file is None:
                        raise ValueError(
                            "Action 'upload_csv' requires argument 'csv_file', "
                            "but none was provided."
                        )

                    # Validate CSV extension
                    valid_csv_ext = (".csv", ".csv.gz")
                    if not any(str(file).lower().endswith(ext) for ext in valid_csv_ext):
                        raise ValueError(
                            f"Invalid file format for upload_csv: '{file}'. "
                            f"Expected one of: {valid_csv_ext}"
                        )

                    self.upload_csv(file)
                    result.dataset = self.current_ds

                elif action == "extract":
                    if self.cfg.dataset is None:
                        raise ValueError(
                            "Action 'extract' cannot run because no dataset provider is set "
                            "(cfg.dataset is None)."
                        )
                    self.extract()
                    result.dataset = self.current_ds

                elif action == "calc_index":
                    if self.current_ds is None:
                        raise ValueError(
                            "Action 'calc_index' requires a dataset, but no dataset is available. "
                            "Upload or extract a dataset before computing an index."
                        )
                    self.calc_index()
                    result.index_ds = self.current_ds

                elif action == "to_csv":
                    if self.current_ds is None:
                        raise ValueError(
                            "Action 'to_dataframe' requires a dataset, but no dataset is available. "
                            "Upload or extract a dataset before converting to a DataFrame."
                        )
                    self.to_dataframe()
                    result.dataframe = self.current_df
                    result.filename = self.to_csv()

                elif action == "to_nc":
                    if self.current_ds is None:
                        raise ValueError(
                            "Action 'to_nc' requires a dataset, but no dataset is available. "
                            "Upload or extract a dataset before saving to NetCDF."
                        )
                    result.filename = self.to_nc()

                elif action == "impute":
                    if self.current_ds is None:
                        raise ValueError("Action 'impute' requires a dataset, but no dataset is available.")
                    self.impute()
                    result.dataset = self.current_ds
                    result.impute_ds = getattr(self, "impute_ds", None)

                else:
                    raise ValueError(f"Unknown action '{action}'")
                self.logger.info("Completed action: %s", action)
            except Exception:
                self.logger.exception("Action '%s' failed", action)
                raise

        return result

    # ----------------------------
    # Exploration helpers using cfg.dsinfo
    # ----------------------------
    def get_datasets(self) -> List[str]:
        """Return the list of dataset provider names available in configuration.

        Returns:
            List[str]: Names of available dataset providers from ``cfg.dsinfo``.
        """
        if not self.cfg or not hasattr(self.cfg, "dsinfo"):
            raise ValueError("Configuration or dsinfo not loaded")
        return list(self.cfg.dsinfo.keys())

    def get_variables(self, dataset: Optional[str] = None) -> List[str]:
        """Return the list of variables available for a dataset.

        Args:
            dataset (str, optional): Dataset name to query. Defaults to ``cfg.dataset``.

        Returns:
            List[str]: List of variable names.
        """
        if not self.cfg or not hasattr(self.cfg, "dsinfo"):
            raise ValueError("Configuration or dsinfo not loaded")

        dataset_name = dataset or getattr(self.cfg, "dataset", None)
        if dataset_name is None:
            raise ValueError("Dataset not specified and cfg.dataset is None")

        dsinfo = self.cfg.dsinfo.get(dataset_name)
        if not dsinfo or "variables" not in dsinfo:
            raise ValueError(f"No variable info available for dataset '{dataset_name}'")

        return list(dsinfo["variables"].keys())

    def get_varinfo(self, var: str) -> dict:
        """Get metadata for a variable from varinfo.

        Args:
            var (str): Name of the variable, e.g., 'tas', 'tasmax', 'pr'.

        Returns:
            dict: Metadata dictionary containing cf_name, long_name, units, etc.

        Raises:
            ValueError: If varinfo is not loaded or variable not found.
        """
        if not self.cfg or not hasattr(self.cfg, "varinfo") or not self.cfg.varinfo:
            raise ValueError("Configuration or varinfo not loaded")

        if var not in self.cfg.varinfo:
            raise ValueError(f"Variable '{var}' not found in varinfo")

        return self.cfg.varinfo[var]


    def get_actions(self) -> dict:
        """Return a dictionary of workflow actions with their outputs and descriptions.

        Supports ``actionsinfo`` in mapping style or list style and returns a consistent mapping of action name to description/output.

        Returns:
            dict: Mapping action name -> {'output': ..., 'description': ...}
        """
        if not self.cfg or not hasattr(self.cfg, "actionsinfo"):
            raise ValueError("Configuration or actionsinfo not loaded")

        actions_map = getattr(self.cfg, "actionsinfo")

        # If 'actions' key exists, fallback to list style
        if "actions" in actions_map:
            actions_map = {a["name"]: {"output": a["output"], "description": a["description"]}
                        for a in actions_map["actions"]}

        return actions_map
    def get_indices(self, variables: List[str], require_all: bool = True) -> Dict[str, dict]:
        """Fetch climate extreme indices from ``cfg.extinfo`` that involve the given variables.

        Args:
            variables (list[str]): Variables to filter indices by (if ``None``, uses ``cfg.variables``).
            require_all (bool): If True, return indices that require all provided variables; otherwise return indices if any variable matches.

        Returns:
            dict: Mapping index_name -> index_definition.
        """
        cfg = self.cfg
        variables = variables or cfg.variables 
        if not hasattr(cfg, "extinfo") or not cfg.extinfo:
            raise ValueError("cfg.extinfo is not defined or empty")

        indices_def = cfg.extinfo.get("indices", {})
        if not indices_def:
            return {}

        matched_indices = {}
        for idx_name, idx_info in indices_def.items():
            idx_vars = idx_info.get("variables", [])
            if require_all:
                if all(var in variables for var in idx_vars):
                    matched_indices[idx_name] = idx_info
            else:
                if any(var in variables for var in idx_vars):
                    matched_indices[idx_name] = idx_info

        return matched_indices

    # ----------------------------
    # Imputation
    # ----------------------------
    @update_ds(attr_name='impute_ds')
    def impute(self, ds: xr.Dataset = None) -> xr.Dataset:
        """Impute missing values using the configured imputation method.

        Args:
            ds (xr.Dataset, optional): Dataset to impute. If None, uses
                ``self.current_ds``.

        Returns:
            xr.Dataset | None: The imputed dataset (also sets
                ``self.current_ds`` and ``self.impute_ds``). Returns ``None``
                if no imputation method is configured.

        Raises:
            ValueError: If ``ds`` is ``None`` and ``self.current_ds`` is not set.
        """
        cfg = self.cfg
        impute_cfg = cfg.imputeinfo
        ds = ds or self.current_ds
        if ds is None:
            raise ValueError("No dataset provided and no current_ds is available.")

        if cfg.impute is None:
            self.logger.warning("No imputation method selected.")
            return None
        # select variables (optional)
        # variables = cfg.get("variables", None)
        # if variables:
        #     missing = [v for v in variables if v not in self.current_ds.data_vars]
        #     if missing:
        #         raise ValueError(f"Variables not present in dataset: {missing}")
        #     ds_in = self.current_ds[variables]
        # else:
        #     ds_in = self.current_ds

        method = cfg.impute
        normalize = impute_cfg[method].get("normalize", True)
        time_dim = cfg.dsinfo[cfg.dataset.upper()].get("time_dim", "time")
        lat_dim = cfg.dsinfo[cfg.dataset.upper()].get("lat_dim", "lat")
        lon_dim = cfg.dsinfo[cfg.dataset.upper()].get("lon_dim", "lon")
        # epochs = impute_cfg[method].get("epochs", 300)

        # run imputer (Imputer expects dims (time, lat, lon))
        imputer = Imputer(
            ds,
            time_dim=time_dim,
            lat_dim=lat_dim,
            lon_dim=lon_dim,
            method=method,
            normalize=normalize,
        )
        recovered = imputer.impute()

        # merge imputed variables back into original dataset if we operated on a subset

        ds_out = recovered

        # Return dataset (decorator will set current_ds and impute_ds and generate filenames)
        return ds_out

    def get_impute_methods(self) -> Dict[str, dict]:
        """Return mapping of available imputation methods from config.

        Returns:
            Dict[str, dict]: Mapping of method name -> config (empty dict if none configured).
        """
        if not hasattr(self.cfg, "imputeinfo") or not self.cfg.imputeinfo:
            return {}
        return dict(self.cfg.imputeinfo)

    def configure_logging(self, level=logging.INFO, handler: logging.Handler = None):
        """Configure logging for this extractor instance.

        Args:
            level (int, optional): Logging level (default: ``logging.INFO``).
            handler (logging.Handler, optional): Handler to add; if ``None``, a default StreamHandler is created.
        """
        if handler is None:
            handler = logging.StreamHandler()
            handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
        # Avoid adding duplicate handlers
        if not any(isinstance(h, handler.__class__) for h in self.logger.handlers):
            self.logger.addHandler(handler)
        self.logger.setLevel(level)
        # also set module logger level
        logger.setLevel(level)

__init__(self, cfg_name='config', conf_path=None, overrides=None) special

Initialize the workflow manager and load configuration.

Parameters:

Name Type Description Default
cfg_name str

Name of the Hydra configuration (default: "config").

'config'
conf_path str

Optional config path override.

None
overrides list[str]

Hydra overrides to apply to the configuration.

None
Source code in climdata/utils/wrapper_workflow.py
def __init__(self, cfg_name="config", conf_path=None, overrides: Optional[List[str]] = None):
    """Initialize the workflow manager and load configuration.

    Args:
        cfg_name (str): Name of the Hydra configuration (default: "config").
        conf_path (str, optional): Optional config path override.
        overrides (list[str], optional): Hydra overrides to apply to the configuration.
    """
    self.cfg_name = cfg_name
    self.conf_path = conf_path
    self.cfg: Optional[DictConfig] = None

    # Stage datasets
    self.ds = None
    self.current_ds = None
    self.index_ds = None
    self.impute_ds = None
    self.bias_corrected_ds = None

    # Stage DataFrames
    self.raw_df = None
    self.current_df = None
    self.index_df = None
    self.impute_df = None
    self.bias_corrected_df = None
    self.df = None  # alias for current_df

    # filenames
    self.filename = None
    self.filetype = None

    # Automatically load config on init
    self.load_config(overrides)
    self.cfg = self.preprocess_aoi(self.cfg)

    # instance logger for this extractor
    self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")

calc_index(self, ds=None)

Calculate the configured extreme index using xclim indices.

Parameters:

Name Type Description Default
ds xr.Dataset

Dataset to operate on. If None, self.current_ds is used.

None

Returns:

Type Description
xr.Dataset

The computed index as an xarray Dataset (also sets self.index_ds).

Source code in climdata/utils/wrapper_workflow.py
@update_ds(attr_name='index_ds')
def calc_index(self, ds: xr.Dataset = None) -> xr.Dataset:
    """Calculate the configured extreme index using xclim indices.

    Args:
        ds (xr.Dataset, optional): Dataset to operate on. If ``None``, ``self.current_ds`` is used.

    Returns:
        xr.Dataset: The computed index as an xarray Dataset (also sets ``self.index_ds``).
    """
    cfg = self.cfg

    # Use provided ds or fallback
    ds = ds or self.current_ds
    if ds is None:
        raise ValueError("No dataset provided and no current_ds is available.")

    if cfg.index is None:
        self.logger.info("No index selected.")
        return None

    if "time" in ds.coords:
        # Handle both numpy datetime64 and cftime datetime objects
        time_values = ds.time.values
        try:
            # Try pandas conversion first (works for datetime64)
            years = pd.to_datetime(time_values).year
        except (TypeError, ValueError):
            # Fall back to cftime handling
            years = np.array([t.year for t in time_values])

        n_years = len(pd.unique(years))
        if n_years < 30:
            warnings.warn(f"Index {cfg.index} usually requires ≥30 years, got {n_years}", UserWarning)

    indices = extreme_index(cfg, ds)
    index_ds = indices.calculate(cfg.index).compute()
    index_ds = index_ds.to_dataset(name=cfg.index)

    return index_ds

configure_logging(self, level=20, handler=None)

Configure logging for this extractor instance.

Parameters:

Name Type Description Default
level int

Logging level (default: logging.INFO).

20
handler logging.Handler

Handler to add; if None, a default StreamHandler is created.

None
Source code in climdata/utils/wrapper_workflow.py
def configure_logging(self, level=logging.INFO, handler: logging.Handler = None):
    """Configure logging for this extractor instance.

    Args:
        level (int, optional): Logging level (default: ``logging.INFO``).
        handler (logging.Handler, optional): Handler to add; if ``None``, a default StreamHandler is created.
    """
    if handler is None:
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
    # Avoid adding duplicate handlers
    if not any(isinstance(h, handler.__class__) for h in self.logger.handlers):
        self.logger.addHandler(handler)
    self.logger.setLevel(level)
    # also set module logger level
    logger.setLevel(level)

extract(self)

Extract data from the configured provider using self.cfg.

Uses provider-specific classes (e.g., CMIP, DWD, MSWX, HYRAS, POWER) to fetch, load and extract datasets. When extraction completes, units are converted to those declared in cfg.varinfo, the dataset is computed, and filenames are generated from the configuration.

Returns:

Type Description
xr.Dataset

The extracted and computed dataset (also sets self.current_ds).

Source code in climdata/utils/wrapper_workflow.py
@update_ds(attr_name='ds')
def extract(self) -> xr.Dataset:
    """Extract data from the configured provider using ``self.cfg``.

    Uses provider-specific classes (e.g., ``CMIP``, ``DWD``, ``MSWX``, ``HYRAS``, ``POWER``)
    to fetch, load and extract datasets. When extraction completes, units are converted to those declared in ``cfg.varinfo``, the dataset is computed, and filenames are generated from the configuration.

    Returns:
        xr.Dataset: The extracted and computed dataset (also sets ``self.current_ds``).
    """
    cfg = self.cfg
    extract_kwargs = {}

    if cfg.lat is not None and cfg.lon is not None:
        extract_kwargs["point"] = (cfg.lon, cfg.lat)
        if cfg.dataset.upper() == "DWD":
            extract_kwargs["buffer_km"] = 30
    elif cfg.region is not None:
        extract_kwargs["box"] = cfg.bounds[cfg.region]
    elif cfg.shapefile is not None:
        extract_kwargs["shapefile"] = cfg.shapefile

    ds = None
    dataset_upper = cfg.dataset.upper()

    if dataset_upper == "MSWX":
        ds_vars = []
        for var in cfg.variables:
            mswx = climdata.MSWX(cfg)
            mswx.extract(**extract_kwargs)
            mswx.load(var)
            ds_vars.append(mswx.dataset)
        ds = xr.merge(ds_vars)
        self.dataset_class = mswx
    elif dataset_upper == "CMIP":
        cmip = climdata.CMIP(cfg)
        cmip.fetch()
        cmip.load()
        cmip.extract(**extract_kwargs)
        ds = cmip.ds
        self.dataset_class = cmip
    elif dataset_upper == "POWER":
        power = climdata.POWER(cfg)
        power.fetch()
        power.load()
        ds = power.ds
        self.dataset_class = power
    elif dataset_upper == "DWD":
        ds_vars = []
        for var in cfg.variables:
            dwd = climdata.DWD(cfg)
            ds_var = dwd.extract(variable=var, **extract_kwargs)
            ds_vars.append(ds_var)
        ds = xr.merge(ds_vars)
        self.dataset_class = dwd
    elif dataset_upper == "HYRAS":
        hyras = climdata.HYRAS(cfg)
        ds_vars = []
        for var in cfg.variables:
            hyras.extract(**extract_kwargs)
            ds_vars.append(hyras.load(var, chunking={'time':"auto"})[[var]])
        ds = xr.merge(ds_vars, compat="override")
        self.dataset_class = hyras
    elif dataset_upper == "HOSTRADA":
        hostrada = climdata.HOSTRADA(cfg)
        hostrada.extract(**extract_kwargs)
        ds_vars = []
        for var in cfg.variables:
            ds_vars.append(hostrada.load(var, chunking={'time': 'auto'})[[var]])
        ds = xr.merge(ds_vars, compat="override")
        self.dataset_class = hostrada
    elif dataset_upper == "W5E5":
        w5e5 = climdata.W5E5(cfg)
        w5e5.fetch()  # Download from ISIMIP
        w5e5.load()   # Load into xarray
        w5e5.extract(**extract_kwargs)
        ds = w5e5.ds
        self.dataset_class = w5e5
    elif dataset_upper == "CMIP_W5E5":
        cmip_w5e5 = climdata.CMIPW5E5(cfg)
        cmip_w5e5.fetch()  # Download CMIP6 data from ISIMIP
        cmip_w5e5.load()   # Load into xarray
        cmip_w5e5.extract(**extract_kwargs)
        ds = cmip_w5e5.ds
        self.dataset_class = cmip_w5e5
    elif dataset_upper == "NEXGDDP":
        nexgddp = climdata.NEXGDDP(cfg)
        nexgddp.fetch()  # Download NEX-GDDP-CMIP6 data from NASA THREDDS
        nexgddp.load()   # Load into xarray
        nexgddp.extract(**extract_kwargs)
        ds = nexgddp.ds
        self.dataset_class = nexgddp
    elif dataset_upper == "AGRI_ISIMIP":
        agri = climdata.AGRI_ISIMIP(cfg)
        agri.fetch()  # Download agricultural data from ISIMIP
        ds_vars = []
        for var in cfg.variables:
            ds_vars.append(agri.load(var, chunking={'time': 'auto'})[[var]])
        ds = xr.merge(ds_vars, compat="override")
        agri.extract(**extract_kwargs)
        self.dataset_class = agri
    for var in ds.data_vars:
        ds[var] = xclim.core.units.convert_units_to(ds[var], cfg.varinfo[var].units)

    # ds = ds.compute()

    return ds

get_actions(self)

Return a dictionary of workflow actions with their outputs and descriptions.

Supports actionsinfo in mapping style or list style and returns a consistent mapping of action name to description/output.

Returns:

Type Description
dict

Mapping action name -> {'output': ..., 'description': ...}

Source code in climdata/utils/wrapper_workflow.py
def get_actions(self) -> dict:
    """Return a dictionary of workflow actions with their outputs and descriptions.

    Supports ``actionsinfo`` in mapping style or list style and returns a consistent mapping of action name to description/output.

    Returns:
        dict: Mapping action name -> {'output': ..., 'description': ...}
    """
    if not self.cfg or not hasattr(self.cfg, "actionsinfo"):
        raise ValueError("Configuration or actionsinfo not loaded")

    actions_map = getattr(self.cfg, "actionsinfo")

    # If 'actions' key exists, fallback to list style
    if "actions" in actions_map:
        actions_map = {a["name"]: {"output": a["output"], "description": a["description"]}
                    for a in actions_map["actions"]}

    return actions_map

get_datasets(self)

Return the list of dataset provider names available in configuration.

Returns:

Type Description
List[str]

Names of available dataset providers from cfg.dsinfo.

Source code in climdata/utils/wrapper_workflow.py
def get_datasets(self) -> List[str]:
    """Return the list of dataset provider names available in configuration.

    Returns:
        List[str]: Names of available dataset providers from ``cfg.dsinfo``.
    """
    if not self.cfg or not hasattr(self.cfg, "dsinfo"):
        raise ValueError("Configuration or dsinfo not loaded")
    return list(self.cfg.dsinfo.keys())

get_impute_methods(self)

Return mapping of available imputation methods from config.

Returns:

Type Description
Dict[str, dict]

Mapping of method name -> config (empty dict if none configured).

Source code in climdata/utils/wrapper_workflow.py
def get_impute_methods(self) -> Dict[str, dict]:
    """Return mapping of available imputation methods from config.

    Returns:
        Dict[str, dict]: Mapping of method name -> config (empty dict if none configured).
    """
    if not hasattr(self.cfg, "imputeinfo") or not self.cfg.imputeinfo:
        return {}
    return dict(self.cfg.imputeinfo)

get_indices(self, variables, require_all=True)

Fetch climate extreme indices from cfg.extinfo that involve the given variables.

Parameters:

Name Type Description Default
variables list[str]

Variables to filter indices by (if None, uses cfg.variables).

required
require_all bool

If True, return indices that require all provided variables; otherwise return indices if any variable matches.

True

Returns:

Type Description
dict

Mapping index_name -> index_definition.

Source code in climdata/utils/wrapper_workflow.py
def get_indices(self, variables: List[str], require_all: bool = True) -> Dict[str, dict]:
    """Fetch climate extreme indices from ``cfg.extinfo`` that involve the given variables.

    Args:
        variables (list[str]): Variables to filter indices by (if ``None``, uses ``cfg.variables``).
        require_all (bool): If True, return indices that require all provided variables; otherwise return indices if any variable matches.

    Returns:
        dict: Mapping index_name -> index_definition.
    """
    cfg = self.cfg
    variables = variables or cfg.variables 
    if not hasattr(cfg, "extinfo") or not cfg.extinfo:
        raise ValueError("cfg.extinfo is not defined or empty")

    indices_def = cfg.extinfo.get("indices", {})
    if not indices_def:
        return {}

    matched_indices = {}
    for idx_name, idx_info in indices_def.items():
        idx_vars = idx_info.get("variables", [])
        if require_all:
            if all(var in variables for var in idx_vars):
                matched_indices[idx_name] = idx_info
        else:
            if any(var in variables for var in idx_vars):
                matched_indices[idx_name] = idx_info

    return matched_indices

get_variables(self, dataset=None)

Return the list of variables available for a dataset.

Parameters:

Name Type Description Default
dataset str

Dataset name to query. Defaults to cfg.dataset.

None

Returns:

Type Description
List[str]

List of variable names.

Source code in climdata/utils/wrapper_workflow.py
def get_variables(self, dataset: Optional[str] = None) -> List[str]:
    """Return the list of variables available for a dataset.

    Args:
        dataset (str, optional): Dataset name to query. Defaults to ``cfg.dataset``.

    Returns:
        List[str]: List of variable names.
    """
    if not self.cfg or not hasattr(self.cfg, "dsinfo"):
        raise ValueError("Configuration or dsinfo not loaded")

    dataset_name = dataset or getattr(self.cfg, "dataset", None)
    if dataset_name is None:
        raise ValueError("Dataset not specified and cfg.dataset is None")

    dsinfo = self.cfg.dsinfo.get(dataset_name)
    if not dsinfo or "variables" not in dsinfo:
        raise ValueError(f"No variable info available for dataset '{dataset_name}'")

    return list(dsinfo["variables"].keys())

get_varinfo(self, var)

Get metadata for a variable from varinfo.

Parameters:

Name Type Description Default
var str

Name of the variable, e.g., 'tas', 'tasmax', 'pr'.

required

Returns:

Type Description
dict

Metadata dictionary containing cf_name, long_name, units, etc.

Exceptions:

Type Description
ValueError

If varinfo is not loaded or variable not found.

Source code in climdata/utils/wrapper_workflow.py
def get_varinfo(self, var: str) -> dict:
    """Get metadata for a variable from varinfo.

    Args:
        var (str): Name of the variable, e.g., 'tas', 'tasmax', 'pr'.

    Returns:
        dict: Metadata dictionary containing cf_name, long_name, units, etc.

    Raises:
        ValueError: If varinfo is not loaded or variable not found.
    """
    if not self.cfg or not hasattr(self.cfg, "varinfo") or not self.cfg.varinfo:
        raise ValueError("Configuration or varinfo not loaded")

    if var not in self.cfg.varinfo:
        raise ValueError(f"Variable '{var}' not found in varinfo")

    return self.cfg.varinfo[var]

impute(self, ds=None)

Impute missing values using the configured imputation method.

Parameters:

Name Type Description Default
ds xr.Dataset

Dataset to impute. If None, uses self.current_ds.

None

Returns:

Type Description
xr.Dataset | None

The imputed dataset (also sets self.current_ds and self.impute_ds). Returns None if no imputation method is configured.

Exceptions:

Type Description
ValueError

If ds is None and self.current_ds is not set.

Source code in climdata/utils/wrapper_workflow.py
@update_ds(attr_name='impute_ds')
def impute(self, ds: xr.Dataset = None) -> xr.Dataset:
    """Impute missing values using the configured imputation method.

    Args:
        ds (xr.Dataset, optional): Dataset to impute. If None, uses
            ``self.current_ds``.

    Returns:
        xr.Dataset | None: The imputed dataset (also sets
            ``self.current_ds`` and ``self.impute_ds``). Returns ``None``
            if no imputation method is configured.

    Raises:
        ValueError: If ``ds`` is ``None`` and ``self.current_ds`` is not set.
    """
    cfg = self.cfg
    impute_cfg = cfg.imputeinfo
    ds = ds or self.current_ds
    if ds is None:
        raise ValueError("No dataset provided and no current_ds is available.")

    if cfg.impute is None:
        self.logger.warning("No imputation method selected.")
        return None
    # select variables (optional)
    # variables = cfg.get("variables", None)
    # if variables:
    #     missing = [v for v in variables if v not in self.current_ds.data_vars]
    #     if missing:
    #         raise ValueError(f"Variables not present in dataset: {missing}")
    #     ds_in = self.current_ds[variables]
    # else:
    #     ds_in = self.current_ds

    method = cfg.impute
    normalize = impute_cfg[method].get("normalize", True)
    time_dim = cfg.dsinfo[cfg.dataset.upper()].get("time_dim", "time")
    lat_dim = cfg.dsinfo[cfg.dataset.upper()].get("lat_dim", "lat")
    lon_dim = cfg.dsinfo[cfg.dataset.upper()].get("lon_dim", "lon")
    # epochs = impute_cfg[method].get("epochs", 300)

    # run imputer (Imputer expects dims (time, lat, lon))
    imputer = Imputer(
        ds,
        time_dim=time_dim,
        lat_dim=lat_dim,
        lon_dim=lon_dim,
        method=method,
        normalize=normalize,
    )
    recovered = imputer.impute()

    # merge imputed variables back into original dataset if we operated on a subset

    ds_out = recovered

    # Return dataset (decorator will set current_ds and impute_ds and generate filenames)
    return ds_out

load_config(self, overrides=None)

Load and compose the Hydra configuration.

Parameters:

Name Type Description Default
overrides list[str]

Hydra overrides to apply when composing the configuration.

None

Returns:

Type Description
DictConfig

Composed Hydra configuration object stored on self.cfg.

Source code in climdata/utils/wrapper_workflow.py
def load_config(self, overrides: Optional[List[str]] = None) -> DictConfig:
    """Load and compose the Hydra configuration.

    Args:
        overrides (list[str], optional): Hydra overrides to apply when composing the configuration.

    Returns:
        DictConfig: Composed Hydra configuration object stored on ``self.cfg``.
    """
    overrides = overrides or []
    conf_dir = _ensure_local_conf()
    rel_conf_dir = os.path.relpath(conf_dir, os.path.dirname(__file__))

    if not GlobalHydra.instance().is_initialized():
        hydra_ctx = initialize(config_path=rel_conf_dir, version_base=None)
    else:
        hydra_ctx = None

    if hydra_ctx:
        with hydra_ctx:
            self.cfg = compose(config_name=self.cfg_name, overrides=overrides)
    else:
        self.cfg = compose(config_name=self.cfg_name, overrides=overrides)
    return self.cfg

preprocess_aoi(self, cfg)

Process an 'aoi' specification in the configuration.

Supports GeoJSON strings or dictionaries for FeatureCollection, Feature, or simple geometry objects (Point/Polygon).

Parameters:

Name Type Description Default
cfg DictConfig

Configuration object with optional aoi entry.

required

Returns:

Type Description
DictConfig

The modified configuration. When a Point is provided, cfg.lat and cfg.lon are set; when a Polygon is provided, cfg.bounds is set and cfg.region is set to "custom".

Source code in climdata/utils/wrapper_workflow.py
def preprocess_aoi(self, cfg: DictConfig) -> DictConfig:
    """Process an 'aoi' specification in the configuration.

    Supports GeoJSON strings or dictionaries for FeatureCollection, Feature, or simple geometry objects (Point/Polygon).

    Args:
        cfg (DictConfig): Configuration object with optional ``aoi`` entry.

    Returns:
        DictConfig: The modified configuration. When a Point is provided, ``cfg.lat`` and ``cfg.lon`` are set; when a Polygon is provided, ``cfg.bounds`` is set and ``cfg.region`` is set to "custom".
    """
    if not hasattr(cfg, "aoi") or cfg.aoi is None:
        return cfg

    if isinstance(cfg.aoi, str):
        try:
            cfg.aoi = json.loads(cfg.aoi)
        except json.JSONDecodeError:
            raise ValueError("Invalid AOI JSON string")

    aoi = cfg.aoi

    if aoi.get("type") == "FeatureCollection":
        geom = shape(aoi["features"][0]["geometry"])
    elif aoi.get("type") == "Feature":
        geom = shape(aoi["geometry"])
    elif "type" in aoi:
        geom = shape(aoi)
    else:
        raise ValueError(f"Unsupported AOI format: {aoi}")

    if isinstance(geom, Point):
        cfg.lat = geom.y
        cfg.lon = geom.x
        cfg.bounds = None
    elif isinstance(geom, Polygon):
        minx, miny, maxx, maxy = geom.bounds
        cfg.bounds = {"custom": {"lat_min": miny, "lat_max": maxy,
                                 "lon_min": minx, "lon_max": maxx}}
        cfg.region = "custom"
        cfg.lat = None
        cfg.lon = None
    else:
        raise ValueError(f"Unknown geometry type {geom.geom_type}")

    return cfg

run_workflow(self, overrides=None, actions=None, file=None)

Execute a sequence of workflow actions.

Parameters:

Name Type Description Default
overrides list[str]

Hydra overrides to apply (not all actions will use these).

None
actions list[str]

Ordered list of actions to perform. Supported actions include: 'upload_netcdf', 'upload_csv', 'extract', 'calc_index', 'to_dataframe', 'to_csv', 'to_nc'.

None
file str

File path used for upload actions when required.

None

Returns:

Type Description
WorkflowResult

Named result container with populated fields for dataframe/dataset/filenames.

Source code in climdata/utils/wrapper_workflow.py
def run_workflow(self, overrides: Optional[List[str]] = None,
                 actions: Optional[List[str]] = None,
                 file: Optional[str] = None) -> WorkflowResult:
    """Execute a sequence of workflow actions.

    Args:
        overrides (list[str], optional): Hydra overrides to apply (not all actions will use these).
        actions (list[str], optional): Ordered list of actions to perform. Supported actions include: 'upload_netcdf', 'upload_csv', 'extract', 'calc_index', 'to_dataframe', 'to_csv', 'to_nc'.
        file (str, optional): File path used for upload actions when required.

    Returns:
        WorkflowResult: Named result container with populated fields for dataframe/dataset/filenames.
    """
    actions = actions or ["extract", "calc_index", "to_csv", "to_nc"]
    result = WorkflowResult(cfg=self.cfg)
    for action in actions:
        self.logger.info("Starting action: %s", action)
        try:
            if action == "upload_netcdf":
                if file is None:
                    raise ValueError(
                        "Action 'upload_netcdf' requires argument 'netcdf_file', "
                        "but none was provided."
                    )
                    # Validate extension
                valid_nc_ext = (".nc", ".nc4", ".nc.gz")
                if not any(str(file).lower().endswith(ext) for ext in valid_nc_ext):
                    raise ValueError(
                        f"Invalid file format for upload_netcdf: '{file}'. "
                        f"Expected one of: {valid_nc_ext}"
                    )
                self.upload_netcdf(file)
                result.dataset = self.current_ds

            elif action == "upload_csv":
                if file is None:
                    raise ValueError(
                        "Action 'upload_csv' requires argument 'csv_file', "
                        "but none was provided."
                    )

                # Validate CSV extension
                valid_csv_ext = (".csv", ".csv.gz")
                if not any(str(file).lower().endswith(ext) for ext in valid_csv_ext):
                    raise ValueError(
                        f"Invalid file format for upload_csv: '{file}'. "
                        f"Expected one of: {valid_csv_ext}"
                    )

                self.upload_csv(file)
                result.dataset = self.current_ds

            elif action == "extract":
                if self.cfg.dataset is None:
                    raise ValueError(
                        "Action 'extract' cannot run because no dataset provider is set "
                        "(cfg.dataset is None)."
                    )
                self.extract()
                result.dataset = self.current_ds

            elif action == "calc_index":
                if self.current_ds is None:
                    raise ValueError(
                        "Action 'calc_index' requires a dataset, but no dataset is available. "
                        "Upload or extract a dataset before computing an index."
                    )
                self.calc_index()
                result.index_ds = self.current_ds

            elif action == "to_csv":
                if self.current_ds is None:
                    raise ValueError(
                        "Action 'to_dataframe' requires a dataset, but no dataset is available. "
                        "Upload or extract a dataset before converting to a DataFrame."
                    )
                self.to_dataframe()
                result.dataframe = self.current_df
                result.filename = self.to_csv()

            elif action == "to_nc":
                if self.current_ds is None:
                    raise ValueError(
                        "Action 'to_nc' requires a dataset, but no dataset is available. "
                        "Upload or extract a dataset before saving to NetCDF."
                    )
                result.filename = self.to_nc()

            elif action == "impute":
                if self.current_ds is None:
                    raise ValueError("Action 'impute' requires a dataset, but no dataset is available.")
                self.impute()
                result.dataset = self.current_ds
                result.impute_ds = getattr(self, "impute_ds", None)

            else:
                raise ValueError(f"Unknown action '{action}'")
            self.logger.info("Completed action: %s", action)
        except Exception:
            self.logger.exception("Action '%s' failed", action)
            raise

    return result

to_csv(self, df=None, filename=None, format='default')

Save a DataFrame to CSV with optional format specification.

Parameters:

Name Type Description Default
df pd.DataFrame

DataFrame to save. Defaults to self.current_df.

None
filename str

Output filename/directory. Defaults to self.filename_csv.

None
format str

Output format. Options: - 'default': Long-form (single file) - 'simplace': SIMPLACE format (splits by location, tab-separated) - 'monica': MONICA format (splits by location, tab-separated) Defaults to 'default'.

'default'

Returns:

Type Description
str

The path of the written CSV file(s). For SIMPLACE/MONICA, returns base directory.

Exceptions:

Type Description
ValueError

If format is not supported or required data is missing.

Source code in climdata/utils/wrapper_workflow.py
def to_csv(self, df: Optional[pd.DataFrame] = None, filename: Optional[str] = None, format: str = "default") -> str:
    """Save a DataFrame to CSV with optional format specification.

    Args:
        df (pd.DataFrame, optional): DataFrame to save. Defaults to ``self.current_df``.
        filename (str, optional): Output filename/directory. Defaults to ``self.filename_csv``.
        format (str): Output format. Options:
            - 'default': Long-form (single file)
            - 'simplace': SIMPLACE format (splits by location, tab-separated)
            - 'monica': MONICA format (splits by location, tab-separated)
            Defaults to 'default'.

    Returns:
        str: The path of the written CSV file(s). For SIMPLACE/MONICA, returns base directory.

    Raises:
        ValueError: If format is not supported or required data is missing.
    """
    df = df if df is not None else self.current_df
    format_lower = format.lower()

    # Apply format conversion if requested
    if format_lower in ("simplace", "monica"):
        return self._convert_to_gridded_format(df, filename, format=format_lower)
    elif format_lower == "default":
        filename = filename or getattr(self, "filename_csv", None)
        if filename is None:
            raise ValueError("No filename provided and filename_csv is not set")

        path = Path(filename)
        path.parent.mkdir(parents=True, exist_ok=True)

        df.to_csv(filename, index=False, sep='\t')
        self.filename_csv = str(path)
        self.current_filename = str(path)

        self.logger.info(f"DataFrame saved to CSV file ({format} format): {self.current_filename}")

        return filename
    else:
        raise ValueError(f"Unsupported format: {format}. Supported formats: 'default', 'simplace', 'monica'")

to_dataframe(self, ds=None)

Convert a dataset to a long-form pandas DataFrame.

The output contains columns: time, lat, lon (or latitude/longitude), variable, value, units, source.

Parameters:

Name Type Description Default
ds xr.Dataset

Dataset to convert. If None, uses self.current_ds.

None

Returns:

Type Description
pd.DataFrame

Long-form DataFrame (also sets self.current_df).

Source code in climdata/utils/wrapper_workflow.py
@update_df()
def to_dataframe(self, ds: xr.Dataset = None) -> pd.DataFrame:
    """Convert a dataset to a long-form pandas DataFrame.

    The output contains columns: time, lat, lon (or latitude/longitude), variable, value, units, source.

    Args:
        ds (xr.Dataset, optional): Dataset to convert. If ``None``, uses ``self.current_ds``.

    Returns:
        pd.DataFrame: Long-form DataFrame (also sets ``self.current_df``).
    """
    ds = ds or self.current_ds
    if ds is None:
        raise ValueError("No dataset provided and no current_ds is available.")

    df = ds.to_dataframe().reset_index()

    id_vars = [c for c in ("time", "lat", "lon", "latitude", "longitude") if c in df]
    value_vars = [v for v in ds.data_vars if v in df.columns]

    if not value_vars:
        raise ValueError("No variables in dataset available to melt into long format")

    df_long = df.melt(
        id_vars=id_vars,
        value_vars=value_vars,
        var_name="variable",
        value_name="value"
    )

    df_long["units"] = df_long["variable"].apply(
        lambda v: ds[v].attrs.get("units", "unknown")
    )
    if getattr(self.cfg, "dataset") == 'cmip':
        df_long["source_id"] = getattr(self.cfg, "source_id")
    df_long["source"] = getattr(self.cfg, "dataset", ds.attrs.get("source", "unknown"))
    df_long = df_long.drop_duplicates()
    self._gen_fn_cfg()
    return df_long

to_nc(self, ds=None, filename=None)

Save an xarray Dataset to NetCDF.

Notes

  • If ds is None: save current_ds.
  • If filename is None: use self.filename_nc.
  • Creates directories if needed and updates self.filename_nc and self.current_filename.

Parameters:

Name Type Description Default
ds xr.Dataset

Dataset to save. If None, uses self.current_ds.

None
filename str

Output filename. Defaults to self.filename_nc.

None

Returns:

Type Description
str

The path of the written NetCDF file.

Source code in climdata/utils/wrapper_workflow.py
def to_nc(self, ds: Optional[xr.Dataset] = None, filename: Optional[str] = None) -> str:
    """Save an xarray Dataset to NetCDF.

    Notes:
        - If ``ds`` is ``None``: save ``current_ds``.
        - If ``filename`` is ``None``: use ``self.filename_nc``.
        - Creates directories if needed and updates ``self.filename_nc`` and ``self.current_filename``.

    Args:
        ds (xr.Dataset, optional): Dataset to save. If ``None``, uses ``self.current_ds``.
        filename (str, optional): Output filename. Defaults to ``self.filename_nc``.

    Returns:
        str: The path of the written NetCDF file.
    """

    # -------------------------------
    # 1. Determine dataset to save
    # -------------------------------
    ds = ds or getattr(self, "current_ds", None)
    if ds is None:
        raise ValueError("No dataset available to save")

    # -------------------------------
    # 2. Determine filename
    # -------------------------------
    filename = filename or getattr(self, "filename_nc", None)
    if filename is None:
        raise ValueError("No filename provided and filename_nc is not set")

    path = Path(filename)
    path.parent.mkdir(parents=True, exist_ok=True)

    # -------------------------------
    # 3. Save to NetCDF
    # -------------------------------
    ds.to_netcdf(path)

    # -------------------------------
    # 4. Track filenames
    # -------------------------------
    self.filename_nc = str(path)
    self.current_filename = str(path)

    # print(f"Dataset saved to NetCDF file: {self.current_filename}")
    self.logger.info(f"Dataset saved to NetCDF file: {self.current_filename}")

    return str(path)

upload_csv(self, csv_file)

Load a long-form CSV into an xarray.Dataset.

The CSV must contain time and lat/latitude, lon/longitude, variable, value. Units may be supplied in a units column and an optional source column is recognized.

Parameters:

Name Type Description Default
csv_file str

Path to the CSV file to load.

required

Returns:

Type Description
xr.Dataset

The converted dataset (also sets self.current_ds).

Source code in climdata/utils/wrapper_workflow.py
@update_ds(attr_name='ds')
def upload_csv(self, csv_file: str) -> xr.Dataset:
    """Load a long-form CSV into an xarray.Dataset.

    The CSV must contain ``time`` and ``lat``/``latitude``, ``lon``/``longitude``, ``variable``, ``value``. Units may be supplied in a ``units`` column and an optional ``source`` column is recognized.

    Args:
        csv_file (str): Path to the CSV file to load.

    Returns:
        xr.Dataset: The converted dataset (also sets ``self.current_ds``).
    """
    if not os.path.exists(csv_file):
        raise FileNotFoundError(f"{csv_file} does not exist")

    df = pd.read_csv(csv_file, parse_dates=["time"])

    lat_col = next((c for c in ["lat", "latitude"] if c in df.columns), None)
    lon_col = next((c for c in ["lon", "longitude"] if c in df.columns), None)
    if lat_col is None or lon_col is None:
        raise ValueError("CSV must have 'lat'/'latitude' and 'lon'/'longitude' columns")

    id_vars = ["time", lat_col, lon_col]
    df_wide = df.pivot_table(index=id_vars, columns="variable", values="value").reset_index()
    ds = df_wide.set_index(id_vars).to_xarray()

    # Attach units from CSV
    for var in ds.data_vars:
        units_series = df[df["variable"] == var]["units"]
        ds[var].attrs["units"] = units_series.iloc[0] if not units_series.empty else "unknown"

    # Global source attribute
    if "source" in df.columns:
        source_series = df["source"].dropna().unique()
        if len(source_series) > 0:
            ds.attrs["source"] = source_series[0]

    # Update cfg variables & varinfo
    if not hasattr(self.cfg, "variables") or not self.cfg.variables:
        self.cfg.variables = list(ds.data_vars)
    if not hasattr(self.cfg, "varinfo") or not self.cfg.varinfo:
        self.cfg.varinfo = {v: {"units": ds[v].attrs.get("units", "unknown")} for v in ds.data_vars}
    self._gen_fn(ds)
    return ds

upload_netcdf(self, nc_file)

Load a NetCDF file into an xarray.Dataset and update file metadata.

Parameters:

Name Type Description Default
nc_file str

Path to the NetCDF file to open.

required

Returns:

Type Description
xr.Dataset

The loaded dataset (also sets self.current_ds).

Source code in climdata/utils/wrapper_workflow.py
@update_ds(attr_name='ds')
def upload_netcdf(self, nc_file: str) -> xr.Dataset:
    """Load a NetCDF file into an xarray.Dataset and update file metadata.

    Args:
        nc_file (str): Path to the NetCDF file to open.

    Returns:
        xr.Dataset: The loaded dataset (also sets ``self.current_ds``).
    """
    if not os.path.exists(nc_file):
        raise FileNotFoundError(f"{nc_file} does not exist")

    ds = xr.open_dataset(nc_file)

    # Update cfg variables & varinfo
    if not hasattr(self.cfg, "variables") or not self.cfg.variables:
        self.cfg.variables = list(ds.data_vars)
    if not hasattr(self.cfg, "varinfo") or not self.cfg.varinfo:
        self.cfg.varinfo = {v: {"units": ds[v].attrs.get("units", "unknown")}
                            for v in ds.data_vars}
    self._gen_fn(ds)
    return ds