Skip to content

fluxopt.types

Type Aliases:

Name Description
Variate

Any input that varies over a subset of the model's variate dims (time,

Classes:

Name Description
IdList

Frozen, ordered container with access by id (str) or position (int).

Functions:

Name Description
fast_concat

Stack DataArrays along a new leading dimension.

as_dataarray

Convert a Variate to a DataArray aligned to given coordinates.

normalize_timesteps

Normalize user-provided timesteps to an internal time index.

compute_dt

Compute dt (hours) for each timestep as a DataArray.

Variate

Any input that varies over a subset of the model's variate dims (time, optionally period, eventually scenario).

  • Scalar: broadcast to all variate dims.
  • 1-D (list/ndarray): matched to a coord by length (must be unambiguous).
  • 1-D (pd.Series): index name selects the dim if set; else matched by length.
  • 2-D (pd.DataFrame): index.name and columns.name must match target dims.
  • n-D (xr.DataArray): dims must be a subset of the target; coords must match exactly.

Per-field reach (which dims a particular field can vary over) is documented on the field itself; as_dataarray enforces that user input only uses dims the caller declared in coords.

IdList

IdList(items: Iterable[T])

Frozen, ordered container with access by id (str) or position (int).

Supports concatenation via +.

Parameters:

Name Type Description Default

items

Iterable[T]

Elements to store. Must have unique ids.

required

Raises:

Type Description
ValueError

On duplicate ids.

Source code in src/fluxopt/types.py
def __init__(self, items: Iterable[T]) -> None:
    self._items: tuple[T, ...] = tuple(items)
    self._by_id: dict[str, T] = {}
    for item in self._items:
        if item.id in self._by_id:
            raise ValueError(f"Duplicate id: '{item.id}'")
        self._by_id[item.id] = item

fast_concat

fast_concat(arrays: list[DataArray], dim: Index) -> DataArray

Stack DataArrays along a new leading dimension.

Drop-in replacement for xr.concat when all slices already share the same dims, shape, and coords. Skips alignment, deepcopy, and reindex — just stacks the underlying numpy arrays.

Parameters:

Name Type Description Default

arrays

list[DataArray]

DataArrays with identical dims, shape, and coords.

required

dim

Index

Index for the new leading dimension.

required

Raises:

Type Description
ValueError

If arrays is empty or any slice has a different shape or dims than the first.

Source code in src/fluxopt/types.py
def fast_concat(arrays: list[xr.DataArray], dim: pd.Index) -> xr.DataArray:
    """Stack DataArrays along a new leading dimension.

    Drop-in replacement for ``xr.concat`` when all slices already share the
    same dims, shape, and coords. Skips alignment, deepcopy, and reindex —
    just stacks the underlying numpy arrays.

    Args:
        arrays: DataArrays with identical dims, shape, and coords.
        dim: Index for the new leading dimension.

    Raises:
        ValueError: If *arrays* is empty or any slice has a different shape or dims than the first.
    """
    if not arrays:
        raise ValueError("fast_concat: 'arrays' must not be empty")
    first = arrays[0]
    expected_shape = first.shape
    expected_dims = first.dims
    for i, a in enumerate(arrays[1:], 1):
        if a.shape != expected_shape:
            raise ValueError(f'fast_concat: slice {i} shape {a.shape} != expected {expected_shape}')
        if a.dims != expected_dims:
            raise ValueError(f'fast_concat: slice {i} dims {a.dims} != expected {expected_dims}')
    data = np.array([a.values for a in arrays])
    name = str(dim.name)
    dims = [name, *expected_dims]
    coords: dict[str, object] = {name: dim}
    for d in expected_dims:
        key = str(d)
        if key in first.coords:
            coords[key] = first.coords[key]
    return xr.DataArray(data, dims=dims, coords=coords)

as_dataarray

as_dataarray(
    value: Variate,
    coords: Mapping[str, Any],
    *,
    name: str = 'value',
    broadcast: bool = True,
) -> DataArray

Convert a Variate to a DataArray aligned to given coordinates.

Pipeline: convert → validate dims → validate coord values → broadcast.

See :data:Variate for accepted inputs. Pandas inputs (Series, DataFrame) follow the same convention as linopy.as_dataarray: the axis name attribute selects the corresponding target dim. For ndarray/list, the dim is selected by length (must be unambiguous). For DataArray, dims must be a subset of coords and coord values must match exactly — alignment errors are surfaced loudly, not silently masked.

Parameters:

Name Type Description Default

value

Variate

Scalar, list, ndarray, Series, DataFrame, or DataArray.

required

coords

Mapping[str, Any]

Target coordinates, e.g. {"time": idx, "period": pidx}. Used both as the reach declaration and as alignment targets.

required

name

str

Name for the resulting DataArray.

'value'

broadcast

bool

Expand result to span all dimensions in coords.

True
Source code in src/fluxopt/types.py
def as_dataarray(
    value: Variate,
    coords: Mapping[str, Any],
    *,
    name: str = 'value',
    broadcast: bool = True,
) -> xr.DataArray:
    """Convert a Variate to a DataArray aligned to given coordinates.

    Pipeline: ``convert → validate dims → validate coord values → broadcast``.

    See :data:`Variate` for accepted inputs. Pandas inputs (``Series``,
    ``DataFrame``) follow the same convention as ``linopy.as_dataarray``: the
    axis ``name`` attribute selects the corresponding target dim. For
    ``ndarray``/``list``, the dim is selected by length (must be unambiguous).
    For ``DataArray``, dims must be a subset of *coords* and coord values must
    match exactly — alignment errors are surfaced loudly, not silently masked.

    Args:
        value: Scalar, list, ndarray, Series, DataFrame, or DataArray.
        coords: Target coordinates, e.g. ``{"time": idx, "period": pidx}``.
            Used both as the reach declaration and as alignment targets.
        name: Name for the resulting DataArray.
        broadcast: Expand result to span all dimensions in *coords*.
    """
    coord_idx = {k: v if isinstance(v, pd.Index) else pd.Index(v) for k, v in coords.items()}

    # --- scalar: 0-dim unless broadcast ---
    if isinstance(value, (int, float)):
        if not broadcast:
            return xr.DataArray(float(value), name=name)
        shape = tuple(len(v) for v in coord_idx.values())
        return xr.DataArray(
            np.full(shape, float(value)),
            dims=list(coord_idx),
            coords=coord_idx,
            name=name,
        )

    # --- 1) Convert to DataArray ---
    da: xr.DataArray
    if isinstance(value, xr.DataArray):
        da = value
    elif isinstance(value, (pd.Series, pd.DataFrame)):
        # Mirror linopy: pandas axes already carry coords; use axis.name as dim.
        # Fall back to length-matching only when no axis is named.
        named = [a.name for a in value.axes if a.name is not None]
        if len(named) == value.ndim:
            da = xr.DataArray(value)
        elif value.ndim == 1 and not named:
            return _from_unnamed_1d(np.asarray(value.values, dtype=float), coord_idx, name, broadcast)
        else:
            raise ValueError(
                f'{type(value).__name__} requires axis.name set on every axis '
                f'(got {[a.name for a in value.axes]!r}). '
                f"Set e.g. df.index.name='time', df.columns.name='period'."
            )
    elif isinstance(value, np.ndarray):
        if value.ndim != 1:
            raise ValueError(
                f'np.ndarray must be 1-D (got ndim={value.ndim}); pass an xr.DataArray '
                f'or pd.DataFrame with named axes for higher-dim inputs.'
            )
        return _from_unnamed_1d(value, coord_idx, name, broadcast)
    elif isinstance(value, list):
        return _from_unnamed_1d(np.asarray(value, dtype=float), coord_idx, name, broadcast)
    else:
        raise TypeError(f'Unsupported Variate type: {type(value)}')

    # --- 2) Validate dims are a subset of the target ---
    foreign = [str(d) for d in da.dims if d not in coord_idx]
    if foreign:
        raise ValueError(
            f'{type(value).__name__} has dims {foreign} not in target coords {list(coord_idx)}. '
            f'Rename before calling as_dataarray().'
        )

    # --- 3) Validate coord values match exactly (close the alignment gap) ---
    for d in da.dims:
        dim_name = str(d)
        if d in da.coords and not pd.Index(da.coords[d].values).equals(coord_idx[dim_name]):
            raise ValueError(
                f'Coord mismatch on dim {dim_name!r}: input coord does not equal target. '
                f"Use the same index as the model's {dim_name}."
            )

    da = da.rename(name)
    if broadcast:
        for dim, idx in coord_idx.items():
            if dim not in da.dims:
                da = da.expand_dims({dim: idx})
        da = da.transpose(*coord_idx)
    return da

normalize_timesteps

normalize_timesteps(timesteps: Timesteps) -> TimeIndex

Normalize user-provided timesteps to an internal time index.

Parameters:

Name Type Description Default

timesteps

Timesteps

Datetime objects, integers, or a DatetimeIndex.

required

Returns:

Type Description
TimeIndex

A datetime index for datetime inputs, or an integer index for integer inputs.

Raises:

Type Description
ValueError

If timesteps are not strictly monotonically increasing.

Source code in src/fluxopt/types.py
def normalize_timesteps(timesteps: Timesteps) -> TimeIndex:
    """Normalize user-provided timesteps to an internal time index.

    Args:
        timesteps: Datetime objects, integers, or a DatetimeIndex.

    Returns:
        A datetime index for datetime inputs, or an integer index for integer inputs.

    Raises:
        ValueError: If timesteps are not strictly monotonically increasing.
    """
    if len(timesteps) == 0:
        raise ValueError('Timesteps must not be empty')

    if isinstance(timesteps, pd.DatetimeIndex):
        idx: TimeIndex = timesteps
    elif isinstance(timesteps, pd.Index):
        if isinstance(timesteps, pd.RangeIndex) or pd.api.types.is_integer_dtype(timesteps.dtype):
            idx = timesteps
        elif pd.api.types.is_datetime64_any_dtype(timesteps.dtype):
            idx = pd.DatetimeIndex(timesteps)
        else:
            raise TypeError(f'Unsupported pd.Index dtype: {timesteps.dtype}. Use datetime or integer index.')
    elif not isinstance(timesteps, list):
        raise TypeError(f'Unsupported Timesteps type: {type(timesteps)}')
    elif isinstance(timesteps[0], datetime):
        idx = pd.DatetimeIndex(timesteps)
    elif type(timesteps[0]) is int:
        idx = pd.Index(timesteps)
        if not pd.api.types.is_integer_dtype(idx.dtype):
            raise TypeError('Integer timesteps contain non-integer values')
    else:
        raise TypeError(f'Unsupported timestep element type: {type(timesteps[0])}. Use datetime or int.')

    if len(idx) > 1 and not idx.is_monotonic_increasing:
        raise ValueError('Timesteps must be strictly monotonically increasing')
    if not idx.is_unique:
        raise ValueError('Timesteps contain duplicates')
    return idx

compute_dt

compute_dt(timesteps: TimeIndex, dt: float | list[float] | None) -> DataArray

Compute dt (hours) for each timestep as a DataArray.

When dt is None, auto-derives from timesteps: - Datetime: consecutive differences in hours; first = second (forward-looking). - Integer: 1.0 for all. - Single timestep: 1.0.

Parameters:

Name Type Description Default

timesteps

TimeIndex

Time index.

required

dt

float | list[float] | None

Override timestep duration. Validated against timesteps length.

required
Source code in src/fluxopt/types.py
def compute_dt(timesteps: TimeIndex, dt: float | list[float] | None) -> xr.DataArray:
    """Compute dt (hours) for each timestep as a DataArray.

    When dt is None, auto-derives from timesteps:
    - Datetime: consecutive differences in hours; first = second (forward-looking).
    - Integer: 1.0 for all.
    - Single timestep: 1.0.

    Args:
        timesteps: Time index.
        dt: Override timestep duration. Validated against timesteps length.
    """
    n = len(timesteps)

    if dt is not None:
        if isinstance(dt, (int, float)):
            values = np.full(n, float(dt))
        elif isinstance(dt, list):
            if len(dt) != n:
                raise ValueError(f'dt length {len(dt)} does not match timesteps length {n}')
            values = np.array(dt, dtype=float)
        else:
            raise TypeError(f'Unsupported dt type: {type(dt)}')
        return xr.DataArray(values, dims=['time'], coords={'time': timesteps}, name='dt')

    # Auto-derive
    if n <= 1:
        return xr.DataArray(np.ones(n), dims=['time'], coords={'time': timesteps}, name='dt')

    if not isinstance(timesteps, pd.DatetimeIndex):
        # Integer timesteps: default to 1.0
        return xr.DataArray(np.ones(n), dims=['time'], coords={'time': timesteps}, name='dt')

    # Datetime: derive from diff in hours
    diffs = np.diff(timesteps.values) / np.timedelta64(1, 'h')
    dt_values = np.empty(n)
    dt_values[0] = diffs[0]
    dt_values[1:] = diffs
    return xr.DataArray(dt_values, dims=['time'], coords={'time': timesteps}, name='dt')