Skip to content

fluxopt.model_data

Classes:

Name Description
FlowsData
CarriersData
ConvertersData
PiecewiseData

Piecewise-linear conversion data for converters with PiecewiseConversion.

EffectsData
StoragesData
Dims

Shared model coordinates and temporal metadata.

ModelData

FlowsData dataclass

FlowsData(
    bound_type: DataArray,
    rel_lb: DataArray,
    rel_ub: DataArray,
    fixed_profile: DataArray,
    size: DataArray,
    effect_coeff: DataArray,
    sizing_min: DataArray | None = None,
    sizing_max: DataArray | None = None,
    sizing_mandatory: DataArray | None = None,
    sizing_effects_per_size: DataArray | None = None,
    sizing_effects_fixed: DataArray | None = None,
    status_min_uptime: DataArray | None = None,
    status_max_uptime: DataArray | None = None,
    status_min_downtime: DataArray | None = None,
    status_max_downtime: DataArray | None = None,
    status_initial: DataArray | None = None,
    status_effects_running: DataArray | None = None,
    status_effects_startup: DataArray | None = None,
    status_previous_uptime: DataArray | None = None,
    status_previous_downtime: DataArray | None = None,
    invest_min: DataArray | None = None,
    invest_max: DataArray | None = None,
    invest_mandatory: DataArray | None = None,
    invest_lifetime: DataArray | None = None,
    invest_prior_size: DataArray | None = None,
    invest_effects_per_size_at_build: DataArray | None = None,
    invest_effects_fixed_at_build: DataArray | None = None,
    invest_effects_per_size_recurring: DataArray | None = None,
    invest_effects_fixed_recurring: DataArray | None = None,
    cstatus_min_uptime: DataArray | None = None,
    cstatus_max_uptime: DataArray | None = None,
    cstatus_min_downtime: DataArray | None = None,
    cstatus_max_downtime: DataArray | None = None,
    cstatus_initial: DataArray | None = None,
    cstatus_effects_running: DataArray | None = None,
    cstatus_effects_startup: DataArray | None = None,
    cstatus_previous_uptime: DataArray | None = None,
    cstatus_previous_downtime: DataArray | None = None,
    cstatus_governed_flows: DataArray | None = None,
)

Methods:

Name Description
__post_init__

Validate relative bounds: non-negative and lb <= ub.

to_dataset

Serialize to xr.Dataset.

from_dataset

Deserialize from xr.Dataset.

build

Build FlowsData from element objects.

__post_init__

__post_init__() -> None

Validate relative bounds: non-negative and lb <= ub.

Source code in src/fluxopt/model_data.py
def __post_init__(self) -> None:
    """Validate relative bounds: non-negative and lb <= ub."""
    reduce_dims = [d for d in self.rel_lb.dims if d != 'flow']
    bad_neg = (self.rel_lb < -1e-12).any(reduce_dims)
    if bad_neg.any():
        raise ValueError(f'Negative lower bounds on flows: {list(self.rel_lb.coords["flow"][bad_neg].values)}')
    bad_order = (self.rel_lb > self.rel_ub + 1e-12).any(reduce_dims)
    if bad_order.any():
        raise ValueError(
            f'Lower bound > upper bound on flows: {list(self.rel_lb.coords["flow"][bad_order].values)}'
        )

to_dataset

to_dataset() -> Dataset

Serialize to xr.Dataset.

Source code in src/fluxopt/model_data.py
def to_dataset(self) -> xr.Dataset:
    """Serialize to xr.Dataset."""
    return _to_dataset(self)

from_dataset classmethod

from_dataset(ds: Dataset) -> Self

Deserialize from xr.Dataset.

Parameters:

Name Type Description Default

ds

Dataset

Dataset with matching variable names.

required
Source code in src/fluxopt/model_data.py
@classmethod
def from_dataset(cls, ds: xr.Dataset) -> Self:
    """Deserialize from xr.Dataset.

    Args:
        ds: Dataset with matching variable names.
    """
    kwargs: dict[str, Any] = {f.name: ds.get(f.name) for f in fields(cls)}
    return cls(**kwargs)

build classmethod

build(
    flows: list[Flow],
    time: TimeIndex,
    effects: list[Effect],
    dt: float = 1.0,
    period: Index | None = None,
    component_status_items: list[tuple[str, Status, list[str]]] | None = None,
) -> Self

Build FlowsData from element objects.

Parameters:

Name Type Description Default

flows

list[Flow]

All collected flows with qualified ids.

required

time

TimeIndex

Time index.

required

effects

list[Effect]

Effect definitions for cost coefficients.

required

dt

float

Scalar timestep duration in hours for prior duration computation.

1.0

period

Index | None

Period index for multi-period models. When provided, effect_coeff, rel_lb, rel_ub and fixed_profile gain a period dimension so that effects_per_flow_hour, relative_minimum, relative_maximum and fixed_relative_profile can vary across periods.

None

component_status_items

list[tuple[str, Status, list[str]]] | None

Component-level status entries as (component_id, Status, [governed flow ids]). Each entry produces an on/startup/shutdown binary keyed by the component, gating all listed flows together.

None
Source code in src/fluxopt/model_data.py
@classmethod
def build(
    cls,
    flows: list[Flow],
    time: TimeIndex,
    effects: list[Effect],
    dt: float = 1.0,
    period: pd.Index | None = None,
    component_status_items: list[tuple[str, Status, list[str]]] | None = None,
) -> Self:
    """Build FlowsData from element objects.

    Args:
        flows: All collected flows with qualified ids.
        time: Time index.
        effects: Effect definitions for cost coefficients.
        dt: Scalar timestep duration in hours for prior duration computation.
        period: Period index for multi-period models. When provided,
            ``effect_coeff``, ``rel_lb``, ``rel_ub`` and ``fixed_profile``
            gain a ``period`` dimension so that ``effects_per_flow_hour``,
            ``relative_minimum``, ``relative_maximum`` and
            ``fixed_relative_profile`` can vary across periods.
        component_status_items: Component-level status entries as
            ``(component_id, Status, [governed flow ids])``. Each entry
            produces an on/startup/shutdown binary keyed by the
            component, gating all listed flows together.
    """
    from fluxopt.elements import Investment, Sizing

    flow_ids = [f.id for f in flows]
    effect_ids = [e.id for e in effects]
    effect_set = set(effect_ids)
    n_time = len(time)
    n_effects = len(effect_ids)

    bound_type: list[str] = []
    rel_lbs: list[xr.DataArray] = []
    rel_ubs: list[xr.DataArray] = []
    profiles: list[xr.DataArray] = []
    size_vals = np.full(len(flows), np.nan)
    effect_coeffs: list[xr.DataArray] = []
    sizing_items: list[tuple[str, Sizing]] = []
    invest_items: list[tuple[str, Investment]] = []
    status_items: list[tuple[str, Status]] = []
    prior_rates_map: dict[str, list[float]] = {}

    envelope_coords: dict[str, Any] = {'time': time}
    if period is not None:
        envelope_coords['period'] = period
    nan_envelope = xr.DataArray(
        np.full([len(v) for v in envelope_coords.values()], np.nan),
        dims=list(envelope_coords),
        coords=envelope_coords,
    )

    for i, f in enumerate(flows):
        rel_lbs.append(as_dataarray(f.relative_minimum, envelope_coords))
        rel_ubs.append(as_dataarray(f.relative_maximum, envelope_coords))

        if isinstance(f.size, Sizing):
            sizing_items.append((f.id, f.size))
        elif isinstance(f.size, Investment):
            invest_items.append((f.id, f.size))
        elif f.size is not None:
            size_vals[i] = f.size

        if f.fixed_relative_profile is not None:
            profiles.append(as_dataarray(f.fixed_relative_profile, envelope_coords))
            bound_type.append('profile')
        elif f.size is None:
            profiles.append(nan_envelope)
            bound_type.append('unsized')
        else:
            profiles.append(nan_envelope)
            bound_type.append('bounded')

        # Effect coefficients for this flow
        ec_coords: dict[str, Any] = {'effect': effect_ids, 'time': time}
        ec_shape = [n_effects, n_time]
        ec_dims = ['effect', 'time']
        if period is not None:
            ec_coords['period'] = period
            ec_shape.append(len(period))
            ec_dims.append('period')
        ec = xr.DataArray(
            np.zeros(ec_shape),
            dims=ec_dims,
            coords=ec_coords,
        )
        as_da_coords: dict[str, Any] = {'time': time}
        if period is not None:
            as_da_coords['period'] = period
        for effect_label, factor in f.effects_per_flow_hour.items():
            if effect_label not in effect_set:
                raise ValueError(f'Unknown effect {effect_label!r} in Flow.effects_per_flow_hour on {f.id!r}')
            ec.loc[effect_label] = as_dataarray(factor, as_da_coords)
        effect_coeffs.append(ec)

        if f.status is not None:
            status_items.append((f.id, f.status))

        if f.prior_rates is not None:
            prior_rates_map[f.id] = f.prior_rates

    flow_idx = pd.Index(flow_ids, name='flow')
    sz = _SizingArrays.build(sizing_items, effect_ids, dim='sizing_flow', period=period)
    inv = _InvestmentArrays.build(invest_items, effect_ids, dim='invest_flow', period=period)
    st = _StatusArrays.build(
        status_items, effect_ids, time, dim='status_flow', prior_rates_map=prior_rates_map, dt=dt, period=period
    )

    cst = _StatusArrays.build(
        [(cid, s) for cid, s, _ in (component_status_items or [])],
        effect_ids,
        time,
        dim='cstatus_component',
        period=period,
        governed_flows_map={cid: gov for cid, _, gov in (component_status_items or [])} or None,
    )

    return cls(
        bound_type=xr.DataArray(bound_type, dims=['flow'], coords={'flow': flow_ids}),
        rel_lb=fast_concat(rel_lbs, flow_idx),
        rel_ub=fast_concat(rel_ubs, flow_idx),
        fixed_profile=fast_concat(profiles, flow_idx),
        size=xr.DataArray(size_vals, dims=['flow'], coords={'flow': flow_ids}),
        effect_coeff=fast_concat(effect_coeffs, flow_idx),
        sizing_min=sz.min,
        sizing_max=sz.max,
        sizing_mandatory=sz.mandatory,
        sizing_effects_per_size=sz.effects_per_size,
        sizing_effects_fixed=sz.effects_fixed,
        status_min_uptime=st.min_uptime,
        status_max_uptime=st.max_uptime,
        status_min_downtime=st.min_downtime,
        status_max_downtime=st.max_downtime,
        status_initial=st.initial,
        status_effects_running=st.effects_running,
        status_effects_startup=st.effects_startup,
        status_previous_uptime=st.previous_uptime,
        status_previous_downtime=st.previous_downtime,
        invest_min=inv.min,
        invest_max=inv.max,
        invest_mandatory=inv.mandatory,
        invest_lifetime=inv.lifetime,
        invest_prior_size=inv.prior_size,
        invest_effects_per_size_at_build=inv.effects_per_size_at_build,
        invest_effects_fixed_at_build=inv.effects_fixed_at_build,
        invest_effects_per_size_recurring=inv.effects_per_size_recurring,
        invest_effects_fixed_recurring=inv.effects_fixed_recurring,
        cstatus_min_uptime=cst.min_uptime,
        cstatus_max_uptime=cst.max_uptime,
        cstatus_min_downtime=cst.min_downtime,
        cstatus_max_downtime=cst.max_downtime,
        cstatus_initial=cst.initial,
        cstatus_effects_running=cst.effects_running,
        cstatus_effects_startup=cst.effects_startup,
        cstatus_previous_uptime=cst.previous_uptime,
        cstatus_previous_downtime=cst.previous_downtime,
        cstatus_governed_flows=cst.governed_flows,
    )

CarriersData dataclass

CarriersData(
    flow_coeff: DataArray,
    unit: DataArray,
    color: DataArray,
    description: DataArray,
)

Methods:

Name Description
to_dataset

Serialize to xr.Dataset.

from_dataset

Deserialize from xr.Dataset.

build

Build CarriersData from explicit carrier declarations.

to_dataset

to_dataset() -> Dataset

Serialize to xr.Dataset.

Source code in src/fluxopt/model_data.py
def to_dataset(self) -> xr.Dataset:
    """Serialize to xr.Dataset."""
    return _to_dataset(self)

from_dataset classmethod

from_dataset(ds: Dataset) -> Self

Deserialize from xr.Dataset.

Parameters:

Name Type Description Default

ds

Dataset

Dataset with flow_coeff, unit, color, description.

required
Source code in src/fluxopt/model_data.py
@classmethod
def from_dataset(cls, ds: xr.Dataset) -> Self:
    """Deserialize from xr.Dataset.

    Args:
        ds: Dataset with ``flow_coeff``, ``unit``, ``color``, ``description``.
    """
    return cls(
        flow_coeff=ds['flow_coeff'],
        unit=ds['unit'],
        color=ds['color'],
        description=ds['description'],
    )

build classmethod

Build CarriersData from explicit carrier declarations.

Parameters:

Name Type Description Default

carriers

list[Carrier]

Declared carriers.

required

flows

list[Flow]

All collected flows.

required

carrier_coeff

dict[str, float]

Mapping of flow id to +1 (produces) or -1 (consumes).

required
Source code in src/fluxopt/model_data.py
@classmethod
def build(cls, carriers: list[Carrier], flows: list[Flow], carrier_coeff: dict[str, float]) -> Self:
    """Build CarriersData from explicit carrier declarations.

    Args:
        carriers: Declared carriers.
        flows: All collected flows.
        carrier_coeff: Mapping of flow id to +1 (produces) or -1 (consumes).
    """
    from fluxopt.elements import node_id

    flow_ids = [f.id for f in flows]
    # Build carrier dim ids from explicit declarations
    carrier_ids: list[str] = []
    for c in carriers:
        if c.nodes:
            carrier_ids.extend(node_id(c.id, node) for node in c.nodes)
        else:
            carrier_ids.append(c.id)

    coeff = np.full((len(carrier_ids), len(flow_ids)), np.nan)
    for f in flows:
        ci = carrier_ids.index(_carrier_dim_id(f))
        fi = flow_ids.index(f.id)
        coeff[ci, fi] = carrier_coeff[f.id]

    # Expand carrier metadata to match carrier dim (one entry per node)
    units: list[str] = []
    colors: list[str] = []
    descriptions: list[str] = []
    for c in carriers:
        n = max(len(c.nodes), 1)
        units.extend([c.unit] * n)
        colors.extend([c.color or ''] * n)
        descriptions.extend([c.description] * n)

    return cls(
        flow_coeff=xr.DataArray(coeff, dims=['carrier', 'flow'], coords={'carrier': carrier_ids, 'flow': flow_ids}),
        unit=xr.DataArray(units, dims=['carrier'], coords={'carrier': carrier_ids}),
        color=xr.DataArray(colors, dims=['carrier'], coords={'carrier': carrier_ids}),
        description=xr.DataArray(descriptions, dims=['carrier'], coords={'carrier': carrier_ids}),
    )

ConvertersData dataclass

ConvertersData(
    pair_coeff: DataArray,
    pair_converter: DataArray,
    pair_flow: DataArray,
    eq_mask: DataArray,
)

Methods:

Name Description
to_dataset

Serialize to xr.Dataset.

from_dataset

Deserialize from xr.Dataset.

build

Build ConvertersData with sparse pair-based conversion coefficients.

Attributes:

Name Type Description
flow_coeff DataArray

Dense (converter, eq_idx, flow, time) view for inspection.

flow_coeff property

flow_coeff: DataArray

Dense (converter, eq_idx, flow, time) view for inspection.

to_dataset

to_dataset() -> Dataset

Serialize to xr.Dataset.

Source code in src/fluxopt/model_data.py
def to_dataset(self) -> xr.Dataset:
    """Serialize to xr.Dataset."""
    return _to_dataset(self)

from_dataset classmethod

from_dataset(ds: Dataset) -> Self

Deserialize from xr.Dataset.

Parameters:

Name Type Description Default

ds

Dataset

Dataset with pair-based converter coefficient variables.

required
Source code in src/fluxopt/model_data.py
@classmethod
def from_dataset(cls, ds: xr.Dataset) -> Self:
    """Deserialize from xr.Dataset.

    Args:
        ds: Dataset with pair-based converter coefficient variables.
    """
    return cls(
        pair_coeff=ds['pair_coeff'],
        pair_converter=ds['pair_converter'],
        pair_flow=ds['pair_flow'],
        eq_mask=ds['eq_mask'],
    )

build classmethod

build(converters: list[Converter], time: TimeIndex) -> Self | None

Build ConvertersData with sparse pair-based conversion coefficients.

Only linear converters are included; piecewise converters (conversion is not None) live in :class:PiecewiseData.

Parameters:

Name Type Description Default

converters

list[Converter]

Converter definitions.

required

time

TimeIndex

Time index.

required
Source code in src/fluxopt/model_data.py
@classmethod
def build(cls, converters: list[Converter], time: TimeIndex) -> Self | None:
    """Build ConvertersData with sparse pair-based conversion coefficients.

    Only linear converters are included; piecewise converters
    (``conversion is not None``) live in :class:`PiecewiseData`.

    Args:
        converters: Converter definitions.
        time: Time index.
    """
    converters = [c for c in converters if c.conversion is None]
    if not converters:
        return None

    conv_ids = [c.id for c in converters]
    max_eq = max(len(c.conversion_factors) for c in converters)
    n_time = len(time)
    eq_idx_list = list(range(max_eq))

    eq_mask_rows: list[np.ndarray] = []
    pairs_conv: list[str] = []
    pairs_flow: list[str] = []
    coeff_arrays: list[np.ndarray] = []

    for conv in converters:
        mask_row = np.zeros(max_eq, dtype=bool)
        for eq_i in range(len(conv.conversion_factors)):
            mask_row[eq_i] = True
        eq_mask_rows.append(mask_row)

        qid_to_short = {v: k for k, v in conv._short_to_id.items()}
        for flow in (*conv.inputs, *conv.outputs):
            short = qid_to_short[flow.id]
            eq_coeffs = np.zeros((max_eq, n_time))
            for eq_i, equation in enumerate(conv.conversion_factors):
                if short in equation:
                    eq_coeffs[eq_i] = as_dataarray(equation[short], {'time': time}).values
            pairs_conv.append(conv.id)
            pairs_flow.append(flow.id)
            coeff_arrays.append(eq_coeffs)

    return cls(
        pair_coeff=xr.DataArray(
            np.array(coeff_arrays),
            dims=['pair', 'eq_idx', 'time'],
            coords={'eq_idx': eq_idx_list, 'time': time},
        ),
        pair_converter=xr.DataArray(pairs_conv, dims=['pair']),
        pair_flow=xr.DataArray(pairs_flow, dims=['pair']),
        eq_mask=xr.DataArray(
            np.array(eq_mask_rows),
            dims=['converter', 'eq_idx'],
            coords={'converter': conv_ids, 'eq_idx': eq_idx_list},
        ),
    )

PiecewiseData dataclass

PiecewiseData(
    breakpoints: DataArray,
    pair_converter: DataArray,
    pair_flow: DataArray,
    pair_bound: DataArray,
    method: DataArray,
    availability: DataArray,
    has_status: DataArray,
)

Piecewise-linear conversion data for converters with PiecewiseConversion.

Stored sparsely as one row per (converter, flow) pair; the method and availability arrays index by pw_converter.

Methods:

Name Description
to_dataset

Serialize to xr.Dataset.

from_dataset

Deserialize from xr.Dataset.

converter_ids

Return list of piecewise converter ids in original order.

build

Build PiecewiseData from converters with PiecewiseConversion.

to_dataset

to_dataset() -> Dataset

Serialize to xr.Dataset.

Source code in src/fluxopt/model_data.py
def to_dataset(self) -> xr.Dataset:
    """Serialize to xr.Dataset."""
    return _to_dataset(self)

from_dataset classmethod

from_dataset(ds: Dataset) -> Self

Deserialize from xr.Dataset.

Parameters:

Name Type Description Default

ds

Dataset

Dataset with piecewise variables.

required
Source code in src/fluxopt/model_data.py
@classmethod
def from_dataset(cls, ds: xr.Dataset) -> Self:
    """Deserialize from xr.Dataset.

    Args:
        ds: Dataset with piecewise variables.
    """
    return cls(
        breakpoints=ds['breakpoints'],
        pair_converter=ds['pair_converter'],
        pair_flow=ds['pair_flow'],
        pair_bound=ds['pair_bound'],
        method=ds['method'],
        availability=ds['availability'],
        has_status=ds['has_status'],
    )

converter_ids

converter_ids() -> list[str]

Return list of piecewise converter ids in original order.

Source code in src/fluxopt/model_data.py
def converter_ids(self) -> list[str]:
    """Return list of piecewise converter ids in original order."""
    return list(self.method.coords['pw_converter'].values)

build classmethod

build(converters: list[Converter], time: TimeIndex) -> Self | None

Build PiecewiseData from converters with PiecewiseConversion.

Parameters:

Name Type Description Default

converters

list[Converter]

Converter definitions; only those with conversion is not None are processed.

required

time

TimeIndex

Time index for breakpoint and availability arrays.

required
Source code in src/fluxopt/model_data.py
@classmethod
def build(cls, converters: list[Converter], time: TimeIndex) -> Self | None:
    """Build PiecewiseData from converters with ``PiecewiseConversion``.

    Args:
        converters: Converter definitions; only those with
            ``conversion is not None`` are processed.
        time: Time index for breakpoint and availability arrays.
    """
    converters = [c for c in converters if c.conversion is not None]
    if not converters:
        return None

    conv_ids: list[str] = []
    methods: list[str] = []
    avail_slices: list[xr.DataArray] = []
    has_statuses: list[bool] = []

    pair_conv_ids: list[str] = []
    pair_flow_ids: list[str] = []
    pair_bounds: list[str] = []
    bp_slices: list[xr.DataArray] = []

    for conv in converters:
        assert conv.conversion is not None
        curve = conv.conversion
        conv_ids.append(conv.id)
        methods.append(curve.method)
        avail_slices.append(as_dataarray(curve.availability, {'time': time}))
        has_statuses.append(curve.status is not None)

        for short, pts, bound in curve._iter_normalized():
            qid = conv._short_to_id[short]
            bp_arrays = [as_dataarray(bp, {'time': time}) for bp in pts]
            bp_idx = pd.Index(range(len(bp_arrays)), name='breakpoint')
            bp_da = fast_concat(bp_arrays, bp_idx)
            pair_conv_ids.append(conv.id)
            pair_flow_ids.append(qid)
            pair_bounds.append(bound)
            bp_slices.append(bp_da)

    pair_idx = pd.Index(range(len(bp_slices)), name='pw_pair')
    breakpoints_da = fast_concat(bp_slices, pair_idx)

    conv_idx = pd.Index(conv_ids, name='pw_converter')
    availability = fast_concat(avail_slices, conv_idx)

    data = cls(
        breakpoints=breakpoints_da,
        pair_converter=xr.DataArray(pair_conv_ids, dims=['pw_pair']),
        pair_flow=xr.DataArray(pair_flow_ids, dims=['pw_pair']),
        pair_bound=xr.DataArray(pair_bounds, dims=['pw_pair']),
        method=xr.DataArray(methods, dims=['pw_converter'], coords={'pw_converter': conv_ids}),
        availability=availability,
        has_status=xr.DataArray(has_statuses, dims=['pw_converter'], coords={'pw_converter': conv_ids}),
    )
    data._warn_redundant_status()
    return data

EffectsData dataclass

EffectsData(
    min_bound: DataArray,
    max_bound: DataArray,
    min_per_period: DataArray,
    max_per_period: DataArray,
    min_per_hour: DataArray,
    max_per_hour: DataArray,
    cf_temporal: DataArray | None = None,
    period_weights: DataArray | None = None,
)

Methods:

Name Description
to_dataset

Serialize to xr.Dataset.

from_dataset

Deserialize from xr.Dataset.

build

Build EffectsData from element objects.

to_dataset

to_dataset() -> Dataset

Serialize to xr.Dataset.

Source code in src/fluxopt/model_data.py
def to_dataset(self) -> xr.Dataset:
    """Serialize to xr.Dataset."""
    return _to_dataset(self)

from_dataset classmethod

from_dataset(ds: Dataset) -> Self

Deserialize from xr.Dataset.

Parameters:

Name Type Description Default

ds

Dataset

Dataset with effect variables and attrs.

required
Source code in src/fluxopt/model_data.py
@classmethod
def from_dataset(cls, ds: xr.Dataset) -> Self:
    """Deserialize from xr.Dataset.

    Args:
        ds: Dataset with effect variables and attrs.
    """
    kwargs: dict[str, object] = {}
    for f in fields(cls):
        if f.name in ds.data_vars:
            kwargs[f.name] = ds[f.name]
        elif f.name in ds.attrs:
            kwargs[f.name] = ds.attrs[f.name]
        # else: rely on dataclass default (e.g. None for optional fields)
    return cls(**kwargs)  # pyrefly: ignore[bad-argument-type]

build classmethod

build(
    effects: list[Effect], time: TimeIndex, period: Index | None = None
) -> Self

Build EffectsData from element objects.

Parameters:

Name Type Description Default

effects

list[Effect]

Effect definitions.

required

time

TimeIndex

Time index.

required

period

Index | None

Period index (multi-period only).

None
Source code in src/fluxopt/model_data.py
@classmethod
def build(
    cls,
    effects: list[Effect],
    time: TimeIndex,
    period: pd.Index | None = None,
) -> Self:
    """Build EffectsData from element objects.

    Args:
        effects: Effect definitions.
        time: Time index.
        period: Period index (multi-period only).
    """
    effect_ids = [e.id for e in effects]
    effect_set = set(effect_ids)
    n = len(effects)
    n_time = len(time)
    min_bound = np.full(n, np.nan)
    max_bound = np.full(n, np.nan)
    min_per_period = np.full(n, np.nan)
    max_per_period = np.full(n, np.nan)
    min_per_hours: list[xr.DataArray] = []
    max_per_hours: list[xr.DataArray] = []

    nan_time = xr.DataArray(np.full(n_time, np.nan), dims=['time'], coords={'time': time})

    has_contributions = False
    for i, e in enumerate(effects):
        if e.minimum is not None:
            min_bound[i] = e.minimum
        if e.maximum is not None:
            max_bound[i] = e.maximum
        if e.minimum_per_period is not None:
            min_per_period[i] = e.minimum_per_period
        if e.maximum_per_period is not None:
            max_per_period[i] = e.maximum_per_period
        min_per_hours.append(
            as_dataarray(e.minimum_per_hour, {'time': time}) if e.minimum_per_hour is not None else nan_time
        )
        max_per_hours.append(
            as_dataarray(e.maximum_per_hour, {'time': time}) if e.maximum_per_hour is not None else nan_time
        )
        if e.contribution_from:
            has_contributions = True

    # Build cross-effect contribution arrays
    cf_temporal: xr.DataArray | None = None
    if has_contributions:
        # Self-reference check
        for e in effects:
            for src_id in e.contribution_from:
                if src_id == e.id:
                    raise ValueError(f'Effect {e.id!r} cannot reference itself in contribution_from')

        # Cycle check
        adjacency: dict[str, list[str]] = {eid: [] for eid in effect_ids}
        for e in effects:
            for src_id in e.contribution_from:
                if src_id not in effect_set:
                    raise ValueError(f'Unknown effect {src_id!r} in contribution_from on {e.id!r}')
                adjacency[e.id].append(src_id)
        cycle = _detect_contribution_cycle(adjacency)
        if cycle is not None:
            raise ValueError(f'Circular contribution_from dependency: {" -> ".join(cycle)}')

        tmpl_t = _effect_template({'effect': effect_ids, 'source_effect': effect_ids, 'time': time}, period)
        temporal_mat = tmpl_t.zeros()
        for e in effects:
            for src_id, factor in e.contribution_from.items():
                if src_id not in effect_set:
                    raise ValueError(f'Unknown effect {src_id!r} in Effect.contribution_from on {e.id!r}')
                temporal_mat.loc[e.id, src_id] = as_dataarray(factor, tmpl_t.as_da_coords)
        cf_temporal = temporal_mat

    effect_idx = pd.Index(effect_ids, name='effect')

    # Per-effect period weights
    pw: xr.DataArray | None = None
    if period is not None:
        has_pw = any(e.period_weights is not None for e in effects)
        n_periods = len(period)
        if has_pw:
            mat = np.full((n, n_periods), np.nan)
            for i, e in enumerate(effects):
                if e.period_weights is not None:
                    if len(e.period_weights) != n_periods:
                        msg = f'Effect {e.id!r}: period_weights has {len(e.period_weights)} entries, expected {n_periods}'
                        raise ValueError(msg)
                    vals = np.asarray(e.period_weights, dtype=float)
                    if not np.all(np.isfinite(vals)) or not np.all(vals > 0):
                        msg = f'Effect {e.id!r}: period_weights must be positive and finite, got {vals}'
                        raise ValueError(msg)
                    mat[i] = vals
            pw = xr.DataArray(mat, dims=['effect', 'period'], coords={'effect': effect_ids, 'period': period})

    return cls(
        min_bound=xr.DataArray(min_bound, dims=['effect'], coords={'effect': effect_ids}),
        max_bound=xr.DataArray(max_bound, dims=['effect'], coords={'effect': effect_ids}),
        min_per_period=xr.DataArray(min_per_period, dims=['effect'], coords={'effect': effect_ids}),
        max_per_period=xr.DataArray(max_per_period, dims=['effect'], coords={'effect': effect_ids}),
        min_per_hour=fast_concat(min_per_hours, effect_idx),
        max_per_hour=fast_concat(max_per_hours, effect_idx),
        cf_temporal=cf_temporal,
        period_weights=pw,
    )

StoragesData dataclass

StoragesData(
    capacity: DataArray,
    eta_c: DataArray,
    eta_d: DataArray,
    loss: DataArray,
    rel_level_lb: DataArray,
    rel_level_ub: DataArray,
    prior_level: DataArray,
    cyclic: DataArray,
    charge_flow: DataArray,
    discharge_flow: DataArray,
    sizing_min: DataArray | None = None,
    sizing_max: DataArray | None = None,
    sizing_mandatory: DataArray | None = None,
    sizing_effects_per_size: DataArray | None = None,
    sizing_effects_fixed: DataArray | None = None,
    invest_min: DataArray | None = None,
    invest_max: DataArray | None = None,
    invest_mandatory: DataArray | None = None,
    invest_lifetime: DataArray | None = None,
    invest_prior_size: DataArray | None = None,
    invest_effects_per_size_at_build: DataArray | None = None,
    invest_effects_fixed_at_build: DataArray | None = None,
    invest_effects_per_size_recurring: DataArray | None = None,
    invest_effects_fixed_recurring: DataArray | None = None,
)

Methods:

Name Description
__post_init__

Validate capacity, efficiencies, and loss rates.

to_dataset

Serialize to xr.Dataset.

from_dataset

Deserialize from xr.Dataset.

build

Build StoragesData from element objects.

__post_init__

__post_init__() -> None

Validate capacity, efficiencies, and loss rates.

Source code in src/fluxopt/model_data.py
def __post_init__(self) -> None:
    """Validate capacity, efficiencies, and loss rates."""
    s = self.capacity.coords['storage']
    cap = self.capacity
    bad_cap = ~np.isnan(cap) & (cap < 0)
    if bad_cap.any():
        raise ValueError(f'Negative capacity on storages: {list(s[bad_cap].values)}')
    bad_eta_c = ((self.eta_c <= 0) | (self.eta_c > 1)).any('time')
    if bad_eta_c.any():
        raise ValueError(f'eta_charge must be in (0, 1] on storages: {list(s[bad_eta_c].values)}')
    bad_eta_d = ((self.eta_d <= 0) | (self.eta_d > 1)).any('time')
    if bad_eta_d.any():
        raise ValueError(f'eta_discharge must be in (0, 1] on storages: {list(s[bad_eta_d].values)}')
    bad_loss = ((self.loss < 0) | (self.loss > 1)).any('time')
    if bad_loss.any():
        raise ValueError(f'relative_loss_per_hour must be in [0, 1] on storages: {list(s[bad_loss].values)}')

to_dataset

to_dataset() -> Dataset

Serialize to xr.Dataset.

Source code in src/fluxopt/model_data.py
def to_dataset(self) -> xr.Dataset:
    """Serialize to xr.Dataset."""
    return _to_dataset(self)

from_dataset classmethod

from_dataset(ds: Dataset) -> Self

Deserialize from xr.Dataset.

Parameters:

Name Type Description Default

ds

Dataset

Dataset with matching variable names.

required
Source code in src/fluxopt/model_data.py
@classmethod
def from_dataset(cls, ds: xr.Dataset) -> Self:
    """Deserialize from xr.Dataset.

    Args:
        ds: Dataset with matching variable names.
    """
    kwargs: dict[str, Any] = {f.name: ds.get(f.name) for f in fields(cls)}
    return cls(**kwargs)

build classmethod

build(
    storages: list[Storage],
    time: TimeIndex,
    dt: DataArray,
    effects: list[Effect] | None = None,
    period: Index | None = None,
) -> Self | None

Build StoragesData from element objects.

Parameters:

Name Type Description Default

storages

list[Storage]

Storage definitions.

required

time

TimeIndex

Time index.

required

dt

DataArray

Timestep durations.

required

effects

list[Effect] | None

Effect definitions for sizing cost validation.

None

period

Index | None

Period index for period-varying effects.

None
Source code in src/fluxopt/model_data.py
@classmethod
def build(
    cls,
    storages: list[Storage],
    time: TimeIndex,
    dt: xr.DataArray,
    effects: list[Effect] | None = None,
    period: pd.Index | None = None,
) -> Self | None:
    """Build StoragesData from element objects.

    Args:
        storages: Storage definitions.
        time: Time index.
        dt: Timestep durations.
        effects: Effect definitions for sizing cost validation.
        period: Period index for period-varying effects.
    """
    from fluxopt.elements import Investment, Sizing

    if not storages:
        return None

    effect_ids = [e.id for e in effects] if effects else []
    stor_ids = [s.id for s in storages]
    n = len(storages)

    capacity_vals = np.full(n, np.nan)
    eta_cs: list[xr.DataArray] = []
    eta_ds: list[xr.DataArray] = []
    losses: list[xr.DataArray] = []
    level_lbs: list[xr.DataArray] = []
    level_ubs: list[xr.DataArray] = []
    prior_level_vals = np.full(n, np.nan)
    cyclic_vals = np.zeros(n, dtype=bool)
    charge_flow: list[str] = []
    discharge_flow: list[str] = []
    sizing_items: list[tuple[str, Sizing]] = []
    invest_items: list[tuple[str, Investment]] = []

    for i, s in enumerate(storages):
        if isinstance(s.capacity, Sizing):
            sizing_items.append((s.id, s.capacity))
        elif isinstance(s.capacity, Investment):
            invest_items.append((s.id, s.capacity))
        elif s.capacity is not None:
            capacity_vals[i] = s.capacity

        eta_cs.append(as_dataarray(s.eta_charge, {'time': time}))
        eta_ds.append(as_dataarray(s.eta_discharge, {'time': time}))
        losses.append(as_dataarray(s.relative_loss_per_hour, {'time': time}))

        level_lbs.append(as_dataarray(s.relative_minimum_level, {'time': time}))
        level_ubs.append(as_dataarray(s.relative_maximum_level, {'time': time}))

        cyclic_vals[i] = s.cyclic
        if s.prior_level is not None:
            prior_level_vals[i] = s.prior_level

        charge_flow.append(s.charging.id)
        discharge_flow.append(s.discharging.id)

    stor_idx = pd.Index(stor_ids, name='storage')
    sz = _SizingArrays.build(sizing_items, effect_ids, dim='sizing_storage', period=period)
    inv = _InvestmentArrays.build(invest_items, effect_ids, dim='invest_storage', period=period)

    return cls(
        capacity=xr.DataArray(capacity_vals, dims=['storage'], coords={'storage': stor_ids}),
        eta_c=xr.concat(eta_cs, dim=stor_idx),
        eta_d=xr.concat(eta_ds, dim=stor_idx),
        loss=xr.concat(losses, dim=stor_idx),
        rel_level_lb=xr.concat(level_lbs, dim=stor_idx),
        rel_level_ub=xr.concat(level_ubs, dim=stor_idx),
        prior_level=xr.DataArray(prior_level_vals, dims=['storage'], coords={'storage': stor_ids}),
        cyclic=xr.DataArray(cyclic_vals, dims=['storage'], coords={'storage': stor_ids}),
        charge_flow=xr.DataArray(charge_flow, dims=['storage'], coords={'storage': stor_ids}),
        discharge_flow=xr.DataArray(discharge_flow, dims=['storage'], coords={'storage': stor_ids}),
        sizing_min=sz.min,
        sizing_max=sz.max,
        sizing_mandatory=sz.mandatory,
        sizing_effects_per_size=sz.effects_per_size,
        sizing_effects_fixed=sz.effects_fixed,
        invest_min=inv.min,
        invest_max=inv.max,
        invest_mandatory=inv.mandatory,
        invest_lifetime=inv.lifetime,
        invest_prior_size=inv.prior_size,
        invest_effects_per_size_at_build=inv.effects_per_size_at_build,
        invest_effects_fixed_at_build=inv.effects_fixed_at_build,
        invest_effects_per_size_recurring=inv.effects_per_size_recurring,
        invest_effects_fixed_recurring=inv.effects_fixed_recurring,
    )

Dims dataclass

Dims(
    time: DataArray,
    dt: DataArray,
    weights: DataArray,
    period: DataArray | None = None,
    period_weights: DataArray | None = None,
)

Shared model coordinates and temporal metadata.

Owns the time and period dimensions, timestep durations, and weights.

Methods:

Name Description
coords

Return shared coordinates for variable/DataArray creation.

to_dataset

Serialize to xr.Dataset.

from_dataset

Deserialize from xr.Dataset.

build

Build Dims from a time index and optional periods.

coords

coords(*, time: bool = False, period: bool = False) -> dict[str, DataArray]

Return shared coordinates for variable/DataArray creation.

Also the single point of truth for the model's variate dims used by :func:fluxopt.types.as_dataarray: pick the reach a field supports (e.g. coords(time=True, period=True) for operational profiles, coords(period=True) for investment-time fields). When a new variate dim (e.g. scenario) is added, extend this method once and every call site picks it up.

Parameters:

Name Type Description Default

time

bool

Include the time coordinate.

False

period

bool

Include the period coordinate (no-op in single-period mode).

False
Source code in src/fluxopt/model_data.py
def coords(self, *, time: bool = False, period: bool = False) -> dict[str, xr.DataArray]:
    """Return shared coordinates for variable/DataArray creation.

    Also the single point of truth for the model's variate dims used by
    :func:`fluxopt.types.as_dataarray`: pick the reach a field supports
    (e.g. ``coords(time=True, period=True)`` for operational profiles,
    ``coords(period=True)`` for investment-time fields). When a new
    variate dim (e.g. ``scenario``) is added, extend this method once
    and every call site picks it up.

    Args:
        time: Include the time coordinate.
        period: Include the period coordinate (no-op in single-period mode).
    """
    result: dict[str, xr.DataArray] = {}
    if time:
        result['time'] = self.time
    if period and self.period is not None:
        result['period'] = self.period
    return result

to_dataset

to_dataset() -> Dataset

Serialize to xr.Dataset.

Source code in src/fluxopt/model_data.py
def to_dataset(self) -> xr.Dataset:
    """Serialize to xr.Dataset."""
    data_vars: dict[str, xr.DataArray] = {'dt': self.dt, 'weights': self.weights}
    if self.period is not None:
        data_vars['period'] = self.period
    if self.period_weights is not None:
        data_vars['period_weights'] = self.period_weights
    return xr.Dataset(data_vars)

from_dataset classmethod

from_dataset(ds: Dataset) -> Self

Deserialize from xr.Dataset.

Parameters:

Name Type Description Default

ds

Dataset

Dataset with dt, weights, and optional period fields.

required
Source code in src/fluxopt/model_data.py
@classmethod
def from_dataset(cls, ds: xr.Dataset) -> Self:
    """Deserialize from xr.Dataset.

    Args:
        ds: Dataset with dt, weights, and optional period fields.
    """
    dt = ds['dt']
    time_idx = dt.coords['time']
    return cls(
        time=time_idx,
        dt=dt,
        weights=ds['weights'],
        period=ds.get('period', None),
        period_weights=ds.get('period_weights', None),
    )

build classmethod

build(
    time: TimeIndex,
    dt: DataArray,
    periods: list[int] | Index | None = None,
    period_weights: list[float] | None = None,
) -> Self

Build Dims from a time index and optional periods.

Parameters:

Name Type Description Default

time

TimeIndex

Normalized time index.

required

dt

DataArray

Timestep durations.

required

periods

list[int] | Index | None

Integer period labels for multi-period optimization.

None

period_weights

list[float] | None

Explicit weights per period. Inferred from gaps if None.

None
Source code in src/fluxopt/model_data.py
@classmethod
def build(
    cls,
    time: TimeIndex,
    dt: xr.DataArray,
    periods: list[int] | pd.Index | None = None,
    period_weights: list[float] | None = None,
) -> Self:
    """Build Dims from a time index and optional periods.

    Args:
        time: Normalized time index.
        dt: Timestep durations.
        periods: Integer period labels for multi-period optimization.
        period_weights: Explicit weights per period. Inferred from gaps if None.
    """
    time_coord = xr.DataArray(time, dims=['time'], coords={'time': time})
    weights = xr.DataArray(np.ones(len(time)), dims=['time'], coords={'time': time}, name='weight')

    period_da: xr.DataArray | None = None
    period_weights_da: xr.DataArray | None = None
    if periods is not None:
        period_idx, period_weights_da = _compute_period_weights(periods, period_weights)
        period_da = xr.DataArray(period_idx.values, dims=['period'], coords={'period': period_idx})

    return cls(
        time=time_coord,
        dt=dt,
        weights=weights,
        period=period_da,
        period_weights=period_weights_da,
    )

ModelData dataclass

ModelData(
    flows: FlowsData,
    carriers: CarriersData,
    converters: ConvertersData | None,
    effects: EffectsData,
    storages: StoragesData | None,
    dims: Dims,
    piecewise: PiecewiseData | None = None,
)

Methods:

Name Description
to_netcdf

Write model data as NetCDF groups under /model/.

from_netcdf

Read model data from NetCDF groups.

build

Build ModelData from element objects.

to_netcdf

to_netcdf(path: str | Path, *, mode: Literal['w', 'a'] = 'a') -> None

Write model data as NetCDF groups under /model/.

Parameters:

Name Type Description Default

path

str | Path

Output file path.

required

mode

Literal['w', 'a']

Write mode ('w' to overwrite, 'a' to append).

'a'
Source code in src/fluxopt/model_data.py
def to_netcdf(self, path: str | Path, *, mode: Literal['w', 'a'] = 'a') -> None:
    """Write model data as NetCDF groups under ``/model/``.

    Args:
        path: Output file path.
        mode: Write mode ('w' to overwrite, 'a' to append).
    """
    p = Path(path)
    dataset_fields: dict[
        str,
        FlowsData | CarriersData | ConvertersData | EffectsData | StoragesData | PiecewiseData | None,
    ] = {
        'flows': self.flows,
        'carriers': self.carriers,
        'converters': self.converters,
        'effects': self.effects,
        'storages': self.storages,
        'piecewise': self.piecewise,
    }
    current_mode = mode
    for name, obj in dataset_fields.items():
        if obj is not None:
            obj.to_dataset().to_netcdf(p, mode=current_mode, group=_NC_GROUPS[name], engine='netcdf4')
            current_mode = 'a'
    self.dims.to_dataset().to_netcdf(p, mode=current_mode, group='model/meta', engine='netcdf4')

from_netcdf classmethod

from_netcdf(path: str | Path) -> ModelData

Read model data from NetCDF groups.

Parameters:

Name Type Description Default

path

str | Path

Input file path.

required

Raises:

Type Description
OSError

If no model data groups found in the file.

Source code in src/fluxopt/model_data.py
@classmethod
def from_netcdf(cls, path: str | Path) -> ModelData:
    """Read model data from NetCDF groups.

    Args:
        path: Input file path.

    Raises:
        OSError: If no model data groups found in the file.
    """
    p = Path(path)
    meta = xr.load_dataset(p, group='model/meta', engine='netcdf4')

    datasets: dict[str, xr.Dataset] = {}
    for name, group in _NC_GROUPS.items():
        try:
            datasets[name] = xr.load_dataset(p, group=group, engine='netcdf4')
        except OSError:
            datasets[name] = xr.Dataset()

    flows = FlowsData.from_dataset(datasets['flows'])
    carriers = CarriersData.from_dataset(datasets['carriers'])
    converters = ConvertersData.from_dataset(datasets['converters']) if datasets['converters'].data_vars else None
    effects = EffectsData.from_dataset(datasets['effects'])
    storages = StoragesData.from_dataset(datasets['storages']) if datasets['storages'].data_vars else None
    piecewise = PiecewiseData.from_dataset(datasets['piecewise']) if datasets['piecewise'].data_vars else None

    return cls(
        flows=flows,
        carriers=carriers,
        converters=converters,
        effects=effects,
        storages=storages,
        dims=Dims.from_dataset(meta),
        piecewise=piecewise,
    )

build classmethod

build(
    timesteps: Timesteps,
    carriers: list[Carrier],
    effects: list[Effect],
    ports: list[Port],
    converters: list[Converter] | None = None,
    storages: list[Storage] | None = None,
    dt: float | list[float] | None = None,
    periods: list[int] | Index | None = None,
    period_weights: list[float] | None = None,
) -> Self

Build ModelData from element objects.

Parameters:

Name Type Description Default

timesteps

Timesteps

Time index for the optimization horizon.

required

carriers

list[Carrier]

Carrier declarations.

required

effects

list[Effect]

Effects to track.

required

ports

list[Port]

System boundary ports.

required

converters

list[Converter] | None

Linear converters.

None

storages

list[Storage] | None

Energy storages.

None

dt

float | list[float] | None

Timestep duration in hours. Auto-derived if None.

None

periods

list[int] | Index | None

Integer period labels for multi-period optimization.

None

period_weights

list[float] | None

Explicit weights per period. Inferred from gaps if None.

None
Source code in src/fluxopt/model_data.py
@classmethod
def build(
    cls,
    timesteps: Timesteps,
    carriers: list[Carrier],
    effects: list[Effect],
    ports: list[Port],
    converters: list[Converter] | None = None,
    storages: list[Storage] | None = None,
    dt: float | list[float] | None = None,
    periods: list[int] | pd.Index | None = None,
    period_weights: list[float] | None = None,
) -> Self:
    """Build ModelData from element objects.

    Args:
        timesteps: Time index for the optimization horizon.
        carriers: Carrier declarations.
        effects: Effects to track.
        ports: System boundary ports.
        converters: Linear converters.
        storages: Energy storages.
        dt: Timestep duration in hours. Auto-derived if None.
        periods: Integer period labels for multi-period optimization.
        period_weights: Explicit weights per period. Inferred from gaps if None.
    """
    from fluxopt.elements import PENALTY_EFFECT_ID, Effect
    from fluxopt.types import compute_dt as _compute_dt

    converters = converters or []
    stor_list = storages or []
    time = normalize_timesteps(timesteps)
    dt_da = _compute_dt(time, dt)

    if not any(e.id == PENALTY_EFFECT_ID for e in effects):
        effects = [*effects, Effect(PENALTY_EFFECT_ID)]

    flows, carrier_coeff = _collect_flows(ports, converters, stor_list)
    _validate_system(effects, ports, converters, stor_list, flows, carriers)

    dims = Dims.build(time, dt_da, periods=periods, period_weights=period_weights)

    # Scalar dt for prior duration computation (use first timestep)
    dt_scalar = float(dims.dt.values[0])
    period_idx = pd.Index(dims.period.values) if dims.period is not None else None

    comp_status_items: list[tuple[str, Status, list[str]]] = [
        (s.id, s.status, [s.charging.id, s.discharging.id]) for s in stor_list if s.status is not None
    ]
    comp_status_items.extend(
        (c.id, c.conversion.status, [f.id for f in (*c.inputs, *c.outputs)])
        for c in converters
        if c.conversion is not None and c.conversion.status is not None
    )

    flows_data = FlowsData.build(
        flows,
        time,
        effects,
        dt=dt_scalar,
        period=period_idx,
        component_status_items=comp_status_items,
    )
    carriers_data = CarriersData.build(carriers, flows, carrier_coeff)
    converters_data = ConvertersData.build(converters, time)
    effects_data = EffectsData.build(effects, time, period=period_idx)
    storages_data = StoragesData.build(stor_list, time, dims.dt, effects, period=period_idx)
    piecewise_data = PiecewiseData.build(converters, time)

    return cls(
        flows=flows_data,
        carriers=carriers_data,
        converters=converters_data,
        effects=effects_data,
        storages=storages_data,
        dims=dims,
        piecewise=piecewise_data,
    )