Skip to content

fluxopt

Modules:

Name Description
components
constraints

Constraint builder functions for the optimization model.

contributions

Per-contributor effect breakdown.

elements
model
model_data
results
stats

Derived statistics from optimization results.

types

Type Aliases:

Name Description
Variate

Any input that varies over a subset of the model's variate dims (time,

Classes:

Name Description
Converter

Conversion between input and output flows.

Port

System boundary that imports from or exports to buses.

Carrier

Physical energy medium (electricity, heat, gas, …).

Effect

A tracked quantity across the optimization horizon (cost, CO₂, …).

Flow

A single energy flow on a carrier.

Investment

Singular discrete build-timing optimization.

PiecewiseConversion

Piecewise-linear conversion linking N flows.

Sizing

Capacity optimization parameters.

Status

Binary on/off behavior parameters.

Storage

Energy storage with level dynamics.

FlowSystem
Dims

Shared model coordinates and temporal metadata.

ModelData
Result

Optimization result with solution variables and model data.

IdList

Frozen, ordered container with access by id (str) or position (int).

Functions:

Name Description
as_dataarray

Convert a Variate to a DataArray aligned to given coordinates.

optimize

Build data, build model, optimize, return results.

Variate

Any input that varies over a subset of the model's variate dims (time, optionally period, eventually scenario).

  • Scalar: broadcast to all variate dims.
  • 1-D (list/ndarray): matched to a coord by length (must be unambiguous).
  • 1-D (pd.Series): index name selects the dim if set; else matched by length.
  • 2-D (pd.DataFrame): index.name and columns.name must match target dims.
  • n-D (xr.DataArray): dims must be a subset of the target; coords must match exactly.

Per-field reach (which dims a particular field can vary over) is documented on the field itself; as_dataarray enforces that user input only uses dims the caller declared in coords.

Converter dataclass

Conversion between input and output flows.

Two mutually exclusive modes:

  • Linearconversion_factors=[{flow_short_id: a_f}, ...], one dict per equation; constraint sum_f(a_f * P_{f,t}) = 0.
  • Piecewiseconversion=PiecewiseConversion(...); the solver interpolates between breakpoints, optionally with on/off via PiecewiseConversion.status.

Parameters:

Name Type Description Default

id

str

Converter id.

required

inputs

list[Flow] | IdList[Flow]

Input flows.

required

outputs

list[Flow] | IdList[Flow]

Output flows.

required

conversion_factors

list[dict[str, Variate]]

Linear-mode equations. Empty when conversion is set.

list()

conversion

PiecewiseConversion | None

Piecewise-mode curve. None for linear mode.

None

Methods:

Name Description
__post_init__

Qualify flow ids and validate mode exclusivity.

boiler

Create a boiler converter: fuel * eta = thermal.

heat_pump

Create a heat pump converter with source heat.

power2heat

Create an electric resistance heater: electrical * eta = thermal.

chp

Create a CHP converter with separate electrical and thermal outputs.

__post_init__

__post_init__() -> None

Qualify flow ids and validate mode exclusivity.

Source code in src/fluxopt/components.py
def __post_init__(self) -> None:
    """Qualify flow ids and validate mode exclusivity."""
    self.inputs = _qualify_flows(self.id, list(self.inputs))
    self.outputs = _qualify_flows(self.id, list(self.outputs))
    self._short_to_id = {f.short_id: f.id for f in (*self.inputs, *self.outputs)}

    if self.conversion is not None:
        if self.conversion_factors:
            msg = (
                f'Converter {self.id!r}: cannot set both conversion_factors and conversion '
                f'(they are mutually exclusive linear vs piecewise modes)'
            )
            raise ValueError(msg)
        curve_flows = {flow for flow, _, _ in self.conversion._iter_normalized()}
        unknown = curve_flows - set(self._short_to_id)
        if unknown:
            msg = (
                f'Converter {self.id!r}: PiecewiseConversion references unknown flow short_ids '
                f'{sorted(unknown)}; known: {sorted(self._short_to_id)}'
            )
            raise ValueError(msg)
        if self.conversion.status is not None:
            for f in (*self.inputs, *self.outputs):
                if f.status is not None:
                    msg = (
                        f'Converter {self.id!r}: flow {f.short_id!r} cannot have flow-level '
                        f'status when PiecewiseConversion.status is set'
                    )
                    raise ValueError(msg)

boiler classmethod

Create a boiler converter: fuel * eta = thermal.

Parameters:

Name Type Description Default

id

str

Converter id.

required

thermal_efficiency

Variate

Thermal efficiency eta.

required

fuel_flow

Flow

Input fuel flow.

required

thermal_flow

Flow

Output thermal flow.

required
Source code in src/fluxopt/components.py
@classmethod
def boiler(cls, id: str, thermal_efficiency: Variate, fuel_flow: Flow, thermal_flow: Flow) -> Converter:
    """Create a boiler converter: fuel * eta = thermal.

    Args:
        id: Converter id.
        thermal_efficiency: Thermal efficiency eta.
        fuel_flow: Input fuel flow.
        thermal_flow: Output thermal flow.
    """
    return cls._single_io(id, thermal_efficiency, fuel_flow, thermal_flow)

heat_pump classmethod

Create a heat pump converter with source heat.

Two conversion equations

electrical * COP = thermal electrical + source = thermal

Parameters:

Name Type Description Default

id

str

Converter id.

required

cop

Variate

Coefficient of performance.

required

electrical_flow

Flow

Input electrical flow.

required

source_flow

Flow

Input environmental heat flow (air, ground, water).

required

thermal_flow

Flow

Output thermal flow.

required
Source code in src/fluxopt/components.py
@classmethod
def heat_pump(
    cls,
    id: str,
    cop: Variate,
    electrical_flow: Flow,
    source_flow: Flow,
    thermal_flow: Flow,
) -> Converter:
    """Create a heat pump converter with source heat.

    Two conversion equations:
        electrical * COP = thermal
        electrical + source = thermal

    Args:
        id: Converter id.
        cop: Coefficient of performance.
        electrical_flow: Input electrical flow.
        source_flow: Input environmental heat flow (air, ground, water).
        thermal_flow: Output thermal flow.
    """
    return cls(
        id,
        inputs=[electrical_flow, source_flow],
        outputs=[thermal_flow],
        conversion_factors=[
            {electrical_flow.short_id: cop, thermal_flow.short_id: -1},
            {electrical_flow.short_id: 1, source_flow.short_id: 1, thermal_flow.short_id: -1},
        ],
    )

power2heat classmethod

Create an electric resistance heater: electrical * eta = thermal.

Parameters:

Name Type Description Default

id

str

Converter id.

required

efficiency

Variate

Electrical-to-thermal efficiency.

required

electrical_flow

Flow

Input electrical flow.

required

thermal_flow

Flow

Output thermal flow.

required
Source code in src/fluxopt/components.py
@classmethod
def power2heat(cls, id: str, efficiency: Variate, electrical_flow: Flow, thermal_flow: Flow) -> Converter:
    """Create an electric resistance heater: electrical * eta = thermal.

    Args:
        id: Converter id.
        efficiency: Electrical-to-thermal efficiency.
        electrical_flow: Input electrical flow.
        thermal_flow: Output thermal flow.
    """
    return cls._single_io(id, efficiency, electrical_flow, thermal_flow)

chp classmethod

Create a CHP converter with separate electrical and thermal outputs.

Parameters:

Name Type Description Default

id

str

Converter id.

required

eta_el

Variate

Electrical efficiency.

required

eta_th

Variate

Thermal efficiency.

required

fuel_flow

Flow

Input fuel flow.

required

electrical_flow

Flow

Output electrical flow.

required

thermal_flow

Flow

Output thermal flow.

required
Source code in src/fluxopt/components.py
@classmethod
def chp(
    cls,
    id: str,
    eta_el: Variate,
    eta_th: Variate,
    fuel_flow: Flow,
    electrical_flow: Flow,
    thermal_flow: Flow,
) -> Converter:
    """Create a CHP converter with separate electrical and thermal outputs.

    Args:
        id: Converter id.
        eta_el: Electrical efficiency.
        eta_th: Thermal efficiency.
        fuel_flow: Input fuel flow.
        electrical_flow: Output electrical flow.
        thermal_flow: Output thermal flow.
    """
    return cls(
        id,
        inputs=[fuel_flow],
        outputs=[electrical_flow, thermal_flow],
        conversion_factors=[
            {fuel_flow.short_id: eta_el, electrical_flow.short_id: -1},
            {fuel_flow.short_id: eta_th, thermal_flow.short_id: -1},
        ],
    )

Port dataclass

Port(
    id: str,
    imports: list[Flow] | IdList[Flow] = list(),
    exports: list[Flow] | IdList[Flow] = list(),
)

System boundary that imports from or exports to buses.

Methods:

Name Description
__post_init__

Qualify flow ids with the port id.

__post_init__

__post_init__() -> None

Qualify flow ids with the port id.

Source code in src/fluxopt/components.py
def __post_init__(self) -> None:
    """Qualify flow ids with the port id."""
    self.imports = _qualify_flows(self.id, list(self.imports))
    self.exports = _qualify_flows(self.id, list(self.exports))

Carrier dataclass

Carrier(
    id: str,
    nodes: list[str] = list(),
    unit: str = 'MWh',
    color: str | None = None,
    description: str = '',
)

Physical energy medium (electricity, heat, gas, …).

Parameters:

Name Type Description Default

id

str

Unique identifier used as xarray coordinate.

required

nodes

list[str]

Sub-nodes for multi-node balancing. Empty means single-node.

list()

unit

str

Energy unit label.

'MWh'

color

str | None

Optional color for plotting.

None

description

str

Human-readable description.

''

Effect dataclass

Effect(
    id: str,
    unit: str = '',
    maximum: float | None = None,
    minimum: float | None = None,
    maximum_per_period: float | None = None,
    minimum_per_period: float | None = None,
    maximum_per_hour: Variate | None = None,
    minimum_per_hour: Variate | None = None,
    contribution_from: dict[str, Variate] = dict(),
    period_weights: list[float] | None = None,
)

A tracked quantity across the optimization horizon (cost, CO₂, …).

One effect is designated as the objective to minimize via the objective_effects argument of optimize(). Others can be bounded to enforce budgets (e.g. emission caps).

Effects accumulate contributions from two domains:

  • Temporal — per-timestep flow costs, running costs, startup costs.
  • Lump — one-time sizing costs and fixed costs.

Cross-effect chains (e.g. CO₂ → cost) are supported via contribution_from.

See: docs/math/effects.md

Parameters:

Name Type Description Default

id

str

Unique identifier.

required

unit

str

Unit label (e.g. '€', 'kg').

''

maximum

float | None

Upper bound on weighted total across all periods.

None

minimum

float | None

Lower bound on weighted total across all periods.

None

maximum_per_period

float | None

Upper bound applied to each period independently.

None

minimum_per_period

float | None

Lower bound applied to each period independently.

None

maximum_per_hour

Variate | None

Upper bound rate [unit/h], scaled by Δt.

None

minimum_per_hour

Variate | None

Lower bound rate [unit/h], scaled by Δt.

None

contribution_from

dict[str, Variate]

Cross-effect factors {source_effect: factor}. Scalar factors apply identically to both domains; time-varying factors are averaged for the lump domain.

dict()

period_weights

list[float] | None

Per-period weights ω for total aggregation; overrides global period_weights.

None

Flow dataclass

Flow(
    carrier: str,
    short_id: str = '',
    node: str | None = None,
    size: float | Sizing | Investment | None = None,
    relative_minimum: Variate = 0.0,
    relative_maximum: Variate = 1.0,
    fixed_relative_profile: Variate | None = None,
    effects_per_flow_hour: dict[str, Variate] = dict(),
    status: Status | None = None,
    prior_rates: list[float] | None = None,
)

A single energy flow on a carrier.

short_id defaults to carrier (or carrier:node when node is set). Set explicitly to disambiguate multiple flows on the same carrier::

Flow('elec')  # short_id='elec'
Flow('heat', node='A')  # short_id='heat:A'
Flow('elec', short_id='base')  # short_id='base'

short_id must be unique within a component. Storage renames colliding short_ids to charge / discharge before qualification. id is the qualified form set by the parent component: component(short_id).

See: docs/math/flows.md

Parameters:

Name Type Description Default

carrier

str

Carrier this flow connects to.

required

short_id

str

Component-local identifier; defaults to carrier (or carrier:node). The qualified form component(short_id) is stored in id.

''

node

str | None

Sub-node for multi-node carrier balancing.

None

size

float | Sizing | Investment | None

Nominal capacity [MW], Sizing for investment optimization, or None (unsized / unbounded).

None

relative_minimum

Variate

Lower bound as fraction of size.

0.0

relative_maximum

Variate

Upper bound as fraction of size.

1.0

fixed_relative_profile

Variate | None

Fixed profile as fraction of size; sets both lower and upper bounds equal to the profile value.

None

effects_per_flow_hour

dict[str, Variate]

Effect coefficients per flow-hour (e.g. €/MWh).

dict()

status

Status | None

On/off behavior (semi-continuous, startup costs, durations).

None

prior_rates

list[float] | None

Flow rates [MW] before the horizon, used for status initial conditions.

None

Methods:

Name Description
__post_init__

Default short_id from carrier/node, set id = short_id.

__post_init__

__post_init__() -> None

Default short_id from carrier/node, set id = short_id.

Source code in src/fluxopt/elements.py
def __post_init__(self) -> None:
    """Default short_id from carrier/node, set id = short_id."""
    if not self.short_id:
        self.short_id = node_id(self.carrier, self.node) if self.node else self.carrier
    self.id = self.short_id
    if self.status is not None and isinstance(self.relative_minimum, (int, float)) and self.relative_minimum <= 0:
        msg = (
            f'Flow {self.short_id!r}: relative_minimum must be > 0 when status is set, '
            f'otherwise on/off is indistinguishable (got {self.relative_minimum})'
        )
        raise ValueError(msg)

Investment dataclass

Singular discrete build-timing optimization.

The solver decides WHEN to build (which period) and at what size. Once built, capacity is available for lifetime periods. Size is decided once — no growth or partial retirement.

Parameters:

Name Type Description Default

min_size

float

Minimum capacity if built.

required

max_size

float

Maximum capacity.

required

mandatory

bool

If True, must build exactly once; if False, may build at most once.

True

lifetime

int | None

Periods active after build; None = forever.

None

prior_size

float

Pre-existing capacity available from period 0.

0.0

effects_per_size_at_build

dict[str, Variate]

One-time per-MW costs charged in the build period.

dict()

effects_fixed_at_build

dict[str, Variate]

One-time fixed costs charged in the build period.

dict()

effects_per_size_recurring

dict[str, Variate]

Recurring per-MW costs charged every active period.

dict()

effects_fixed_recurring

dict[str, Variate]

Recurring fixed costs charged every active period.

dict()

PiecewiseConversion dataclass

PiecewiseConversion(
    points: dict[str, list[Variate]] | list[_CurveTuple],
    method: PiecewiseMethod = 'auto',
    status: Status | None = None,
    availability: Variate = 1.0,
)

Piecewise-linear conversion linking N flows.

Wraps :func:linopy.piecewise.add_piecewise_formulation. All flows share interpolation weights — every operating point lies on the same piece of the curve.

Two input forms:

  • Dict — equality-only, terse for the common case::

    PiecewiseConversion({'fuel': [0, 50, 100], 'Heat': [0, 45, 70]})

  • List of tuples — supports per-flow inequality bounds::

    PiecewiseConversion( [ ('fuel', [0, 50, 100]), ('Heat', [0, 45, 70], '>='), ] )

See: docs/math/converters.md

Parameters:

Name Type Description Default

points

dict[str, list[Variate]] | list[_CurveTuple]

Per-flow breakpoints. Either {flow: [bp...]} (equality only) or a list of (flow, [bp...]) / (flow, [bp...], '<='|'>=') tuples. Need >=2 flows; all breakpoint lists must share the same length (>=2). At most one tuple may carry a non-equality bound, and only when exactly two flows are present.

required

method

PiecewiseMethod

Formulation. "auto" picks LP (2 flows + bounded + matching convexity), else incremental (monotonic) or sos2. Override with "sos2" / "incremental" / "lp".

'auto'

status

Status | None

Component-level on/off behavior gating the curve.

None

availability

Variate

Time-varying scaling of the upper breakpoint.

1.0

Methods:

Name Description
__post_init__

Validate normalized breakpoints and bound combinations.

__post_init__

__post_init__() -> None

Validate normalized breakpoints and bound combinations.

Source code in src/fluxopt/elements.py
def __post_init__(self) -> None:
    """Validate normalized breakpoints and bound combinations."""
    flows_pts_bounds = list(self._iter_normalized())

    if len(flows_pts_bounds) < 2:
        msg = f'PiecewiseConversion needs >=2 flows, got {len(flows_pts_bounds)}'
        raise ValueError(msg)

    n = len(flows_pts_bounds[0][1])
    if n < 2:
        msg = f'PiecewiseConversion needs >=2 breakpoints per flow, got {n}'
        raise ValueError(msg)

    if any(len(pts) != n for _, pts, _ in flows_pts_bounds):
        lengths = {flow: len(pts) for flow, pts, _ in flows_pts_bounds}
        msg = f'PiecewiseConversion breakpoint lists must all have the same length, got {lengths}'
        raise ValueError(msg)

    flows = [flow for flow, _, _ in flows_pts_bounds]
    if len(set(flows)) != len(flows):
        dupes = [f for f in flows if flows.count(f) > 1]
        msg = f'PiecewiseConversion has duplicate flow ids: {sorted(set(dupes))}'
        raise ValueError(msg)

    nonequal = [b for _, _, b in flows_pts_bounds if b != '==']
    if len(nonequal) > 1:
        msg = f'At most one bounded flow per PiecewiseConversion, got {len(nonequal)}'
        raise ValueError(msg)
    if nonequal and len(flows_pts_bounds) > 2:
        msg = f'Inequality bounds require exactly 2 flows, got {len(flows_pts_bounds)}'
        raise ValueError(msg)
    if self.method == 'lp' and not nonequal:
        msg = "method='lp' requires one flow with bound '<=' or '>='"
        raise ValueError(msg)

Sizing dataclass

Capacity optimization parameters.

The solver decides the optimal size within [min_size, max_size].

  • mandatory=True: continuous, size in [min, max], no binary.
  • mandatory=False: binary indicator gates size: 0 or [min, max].
  • min_size == max_size with mandatory=False: binary invest at exact size (yes/no).

See: docs/math/sizing.md

Parameters:

Name Type Description Default

min_size

float

Minimum capacity if invested.

required

max_size

float

Maximum capacity.

required

mandatory

bool

If True, must be built (no binary indicator).

True

effects_per_size

dict[str, Variate]

Effect cost per unit size (e.g. €/MW).

dict()

effects_fixed

dict[str, Variate]

Fixed effect cost if built (optional only).

dict()

Status dataclass

Status(
    min_uptime: float | None = None,
    max_uptime: float | None = None,
    min_downtime: float | None = None,
    max_downtime: float | None = None,
    effects_per_running_hour: dict[str, Variate] = dict(),
    effects_per_startup: dict[str, Variate] = dict(),
)

Binary on/off behavior parameters.

Together with relative bounds, gives semi-continuous behavior: {0} U [min, max] * size.

See: docs/math/status.md

Parameters:

Name Type Description Default

min_uptime

float | None

Minimum consecutive on-hours.

None

max_uptime

float | None

Maximum consecutive on-hours.

None

min_downtime

float | None

Minimum consecutive off-hours.

None

max_downtime

float | None

Maximum consecutive off-hours.

None

effects_per_running_hour

dict[str, Variate]

Effect cost per running hour.

dict()

effects_per_startup

dict[str, Variate]

Effect cost per startup event.

dict()

Storage dataclass

Energy storage with level dynamics.

Flow ids are qualified as storage(flow). When both flows connect to the same carrier, they are renamed to charge / discharge::

Storage('bat', Flow('elec'), Flow('elec'))  # bat(charge), bat(discharge)
Storage('bat', Flow('elec'), Flow('heat'))  # bat(elec), bat(heat)

Level balance::

E_{s,t+1} = E_{s,t} (1 - δ)^Δt + P^c η^c Δt - P^d / η^d Δt

See: docs/math/storage.md

Parameters:

Name Type Description Default

id

str

Storage identifier.

required

charging

Flow

Charging flow.

required

discharging

Flow

Discharging flow.

required

capacity

float | Sizing | Investment | None

Maximum stored energy [MWh], Sizing for investment optimization, or None.

None

eta_charge

Variate

Charging efficiency.

1.0

eta_discharge

Variate

Discharging efficiency.

1.0

relative_loss_per_hour

Variate

Self-discharge rate [1/h].

0.0

prior_level

float | None

Initial energy level [MWh]; None = unconstrained.

None

cyclic

bool

If True, end level must equal start level.

True

relative_minimum_level

Variate

Min SOC as fraction of capacity.

0.0

relative_maximum_level

Variate

Max SOC as fraction of capacity.

1.0

status

Status | None

Component-level on/off behavior gating both charging and discharging. Forbids flow-level status on the child flows (the two switches would have no defined precedence).

None

Methods:

Name Description
__post_init__

Validate carrier match, rename colliding flow ids, and qualify.

__post_init__

__post_init__() -> None

Validate carrier match, rename colliding flow ids, and qualify.

Source code in src/fluxopt/elements.py
def __post_init__(self) -> None:
    """Validate carrier match, rename colliding flow ids, and qualify."""
    if self.charging.carrier != self.discharging.carrier:
        msg = (
            f'Storage {self.id!r}: charging carrier {self.charging.carrier!r} '
            f'!= discharging carrier {self.discharging.carrier!r}'
        )
        raise ValueError(msg)
    if self.charging.short_id == self.discharging.short_id:
        self.charging.short_id = 'charge'
        self.discharging.short_id = 'discharge'
    self.charging.id = qualified_id(self.id, self.charging.short_id)
    self.discharging.id = qualified_id(self.id, self.discharging.short_id)
    if self.status is not None:
        for f in (self.charging, self.discharging):
            if f.status is not None:
                msg = (
                    f'Storage {self.id!r}: flow {f.short_id!r} cannot have flow-level '
                    f'status when Storage.status is set; the component status already gates both flows'
                )
                raise ValueError(msg)
            if f.size is None:
                msg = (
                    f'Storage {self.id!r}: flow {f.short_id!r} must have a size when '
                    f'Storage.status is set — without it, the on/off binary cannot gate '
                    f'the rate (no upper bound to scale)'
                )
                raise ValueError(msg)

FlowSystem

FlowSystem(data: ModelData)

Initialize the flow system optimization model.

Parameters:

Name Type Description Default

data

ModelData

Pre-built model data.

required

Methods:

Name Description
build

Build all variables, constraints, and the objective.

optimize

Build, optionally customize, and solve the model.

solve

Solve the built model and return results.

Source code in src/fluxopt/model.py
def __init__(self, data: ModelData) -> None:
    """Initialize the flow system optimization model.

    Args:
        data: Pre-built model data.
    """
    self.data = data
    self.m = Model()
    self._objective_effects: list[str] = []
    self._piecewise: dict[str, Any] = {}  # conv_id -> linopy.PiecewiseFormulation

build

build() -> None

Build all variables, constraints, and the objective.

Source code in src/fluxopt/model.py
def build(self) -> None:
    """Build all variables, constraints, and the objective."""
    # Phase 1: Decision variables
    self._create_flow_variables()
    self._create_sizing_variables()
    self._create_investment_variables()
    self._create_status_variables()
    self._create_component_status_variables()
    # Phase 2: Flow rate constraints
    self._constrain_flow_rates_plain()
    self._constrain_flow_rates_sizing()
    self._constrain_flow_rates_status()
    self._constrain_flow_rates_component_status()
    # Phase 3: Feature constraints
    self._constrain_sizing()
    self._constrain_investment()
    self._constrain_status()
    self._constrain_component_status()
    # Phase 4: System
    self._create_balance()
    self._create_converter_constraints()
    self._create_piecewise_constraints()
    self._create_storage()
    self._create_effects()
    self._set_objective()
    self._builtin_var_names: frozenset[str] = frozenset(self.m.variables)

optimize

optimize(
    objective_effects: str | list[str],
    customize: Callable[[FlowSystem], None] | None = None,
    *,
    solver: str = 'highs',
    **kwargs: Any,
) -> Result

Build, optionally customize, and solve the model.

Parameters:

Name Type Description Default

objective_effects

str | list[str]

Effect name(s) to minimize. Sum of named effect totals.

required

customize

Callable[[FlowSystem], None] | None

Optional callback to modify the linopy model between build and solve. Receives self; use model.m to add variables/constraints.

None

solver

str

Solver backend name.

'highs'

**kwargs

Any

Passed through to linopy.Model.solve().

{}
Source code in src/fluxopt/model.py
def optimize(
    self,
    objective_effects: str | list[str],
    customize: Callable[[FlowSystem], None] | None = None,
    *,
    solver: str = 'highs',
    **kwargs: Any,
) -> Result:
    """Build, optionally customize, and solve the model.

    Args:
        objective_effects: Effect name(s) to minimize. Sum of named effect totals.
        customize: Optional callback to modify the linopy model between build and solve.
            Receives ``self``; use ``model.m`` to add variables/constraints.
        solver: Solver backend name.
        **kwargs: Passed through to ``linopy.Model.solve()``.
    """
    self._objective_effects = [objective_effects] if isinstance(objective_effects, str) else objective_effects
    self.build()
    if customize is not None:
        customize(self)
    return self.solve(solver_name=solver, **kwargs)

solve

solve(**kwargs: Any) -> Result

Solve the built model and return results.

Thin wrapper around linopy.Model.solve(). Call :meth:build first.

Parameters:

Name Type Description Default

**kwargs

Any

Passed through to linopy.Model.solve().

{}
Source code in src/fluxopt/model.py
def solve(self, **kwargs: Any) -> Result:
    """Solve the built model and return results.

    Thin wrapper around ``linopy.Model.solve()``. Call :meth:`build` first.

    Args:
        **kwargs: Passed through to ``linopy.Model.solve()``.
    """
    self.m.solve(**kwargs)
    return Result.from_model(self)

Dims dataclass

Dims(
    time: DataArray,
    dt: DataArray,
    weights: DataArray,
    period: DataArray | None = None,
    period_weights: DataArray | None = None,
)

Shared model coordinates and temporal metadata.

Owns the time and period dimensions, timestep durations, and weights.

Methods:

Name Description
coords

Return shared coordinates for variable/DataArray creation.

to_dataset

Serialize to xr.Dataset.

from_dataset

Deserialize from xr.Dataset.

build

Build Dims from a time index and optional periods.

coords

coords(*, time: bool = False, period: bool = False) -> dict[str, DataArray]

Return shared coordinates for variable/DataArray creation.

Also the single point of truth for the model's variate dims used by :func:fluxopt.types.as_dataarray: pick the reach a field supports (e.g. coords(time=True, period=True) for operational profiles, coords(period=True) for investment-time fields). When a new variate dim (e.g. scenario) is added, extend this method once and every call site picks it up.

Parameters:

Name Type Description Default

time

bool

Include the time coordinate.

False

period

bool

Include the period coordinate (no-op in single-period mode).

False
Source code in src/fluxopt/model_data.py
def coords(self, *, time: bool = False, period: bool = False) -> dict[str, xr.DataArray]:
    """Return shared coordinates for variable/DataArray creation.

    Also the single point of truth for the model's variate dims used by
    :func:`fluxopt.types.as_dataarray`: pick the reach a field supports
    (e.g. ``coords(time=True, period=True)`` for operational profiles,
    ``coords(period=True)`` for investment-time fields). When a new
    variate dim (e.g. ``scenario``) is added, extend this method once
    and every call site picks it up.

    Args:
        time: Include the time coordinate.
        period: Include the period coordinate (no-op in single-period mode).
    """
    result: dict[str, xr.DataArray] = {}
    if time:
        result['time'] = self.time
    if period and self.period is not None:
        result['period'] = self.period
    return result

to_dataset

to_dataset() -> Dataset

Serialize to xr.Dataset.

Source code in src/fluxopt/model_data.py
def to_dataset(self) -> xr.Dataset:
    """Serialize to xr.Dataset."""
    data_vars: dict[str, xr.DataArray] = {'dt': self.dt, 'weights': self.weights}
    if self.period is not None:
        data_vars['period'] = self.period
    if self.period_weights is not None:
        data_vars['period_weights'] = self.period_weights
    return xr.Dataset(data_vars)

from_dataset classmethod

from_dataset(ds: Dataset) -> Self

Deserialize from xr.Dataset.

Parameters:

Name Type Description Default

ds

Dataset

Dataset with dt, weights, and optional period fields.

required
Source code in src/fluxopt/model_data.py
@classmethod
def from_dataset(cls, ds: xr.Dataset) -> Self:
    """Deserialize from xr.Dataset.

    Args:
        ds: Dataset with dt, weights, and optional period fields.
    """
    dt = ds['dt']
    time_idx = dt.coords['time']
    return cls(
        time=time_idx,
        dt=dt,
        weights=ds['weights'],
        period=ds.get('period', None),
        period_weights=ds.get('period_weights', None),
    )

build classmethod

build(
    time: TimeIndex,
    dt: DataArray,
    periods: list[int] | Index | None = None,
    period_weights: list[float] | None = None,
) -> Self

Build Dims from a time index and optional periods.

Parameters:

Name Type Description Default

time

TimeIndex

Normalized time index.

required

dt

DataArray

Timestep durations.

required

periods

list[int] | Index | None

Integer period labels for multi-period optimization.

None

period_weights

list[float] | None

Explicit weights per period. Inferred from gaps if None.

None
Source code in src/fluxopt/model_data.py
@classmethod
def build(
    cls,
    time: TimeIndex,
    dt: xr.DataArray,
    periods: list[int] | pd.Index | None = None,
    period_weights: list[float] | None = None,
) -> Self:
    """Build Dims from a time index and optional periods.

    Args:
        time: Normalized time index.
        dt: Timestep durations.
        periods: Integer period labels for multi-period optimization.
        period_weights: Explicit weights per period. Inferred from gaps if None.
    """
    time_coord = xr.DataArray(time, dims=['time'], coords={'time': time})
    weights = xr.DataArray(np.ones(len(time)), dims=['time'], coords={'time': time}, name='weight')

    period_da: xr.DataArray | None = None
    period_weights_da: xr.DataArray | None = None
    if periods is not None:
        period_idx, period_weights_da = _compute_period_weights(periods, period_weights)
        period_da = xr.DataArray(period_idx.values, dims=['period'], coords={'period': period_idx})

    return cls(
        time=time_coord,
        dt=dt,
        weights=weights,
        period=period_da,
        period_weights=period_weights_da,
    )

ModelData dataclass

ModelData(
    flows: FlowsData,
    carriers: CarriersData,
    converters: ConvertersData | None,
    effects: EffectsData,
    storages: StoragesData | None,
    dims: Dims,
    piecewise: PiecewiseData | None = None,
)

Methods:

Name Description
to_netcdf

Write model data as NetCDF groups under /model/.

from_netcdf

Read model data from NetCDF groups.

build

Build ModelData from element objects.

to_netcdf

to_netcdf(path: str | Path, *, mode: Literal['w', 'a'] = 'a') -> None

Write model data as NetCDF groups under /model/.

Parameters:

Name Type Description Default

path

str | Path

Output file path.

required

mode

Literal['w', 'a']

Write mode ('w' to overwrite, 'a' to append).

'a'
Source code in src/fluxopt/model_data.py
def to_netcdf(self, path: str | Path, *, mode: Literal['w', 'a'] = 'a') -> None:
    """Write model data as NetCDF groups under ``/model/``.

    Args:
        path: Output file path.
        mode: Write mode ('w' to overwrite, 'a' to append).
    """
    p = Path(path)
    dataset_fields: dict[
        str,
        FlowsData | CarriersData | ConvertersData | EffectsData | StoragesData | PiecewiseData | None,
    ] = {
        'flows': self.flows,
        'carriers': self.carriers,
        'converters': self.converters,
        'effects': self.effects,
        'storages': self.storages,
        'piecewise': self.piecewise,
    }
    current_mode = mode
    for name, obj in dataset_fields.items():
        if obj is not None:
            obj.to_dataset().to_netcdf(p, mode=current_mode, group=_NC_GROUPS[name], engine='netcdf4')
            current_mode = 'a'
    self.dims.to_dataset().to_netcdf(p, mode=current_mode, group='model/meta', engine='netcdf4')

from_netcdf classmethod

from_netcdf(path: str | Path) -> ModelData

Read model data from NetCDF groups.

Parameters:

Name Type Description Default

path

str | Path

Input file path.

required

Raises:

Type Description
OSError

If no model data groups found in the file.

Source code in src/fluxopt/model_data.py
@classmethod
def from_netcdf(cls, path: str | Path) -> ModelData:
    """Read model data from NetCDF groups.

    Args:
        path: Input file path.

    Raises:
        OSError: If no model data groups found in the file.
    """
    p = Path(path)
    meta = xr.load_dataset(p, group='model/meta', engine='netcdf4')

    datasets: dict[str, xr.Dataset] = {}
    for name, group in _NC_GROUPS.items():
        try:
            datasets[name] = xr.load_dataset(p, group=group, engine='netcdf4')
        except OSError:
            datasets[name] = xr.Dataset()

    flows = FlowsData.from_dataset(datasets['flows'])
    carriers = CarriersData.from_dataset(datasets['carriers'])
    converters = ConvertersData.from_dataset(datasets['converters']) if datasets['converters'].data_vars else None
    effects = EffectsData.from_dataset(datasets['effects'])
    storages = StoragesData.from_dataset(datasets['storages']) if datasets['storages'].data_vars else None
    piecewise = PiecewiseData.from_dataset(datasets['piecewise']) if datasets['piecewise'].data_vars else None

    return cls(
        flows=flows,
        carriers=carriers,
        converters=converters,
        effects=effects,
        storages=storages,
        dims=Dims.from_dataset(meta),
        piecewise=piecewise,
    )

build classmethod

build(
    timesteps: Timesteps,
    carriers: list[Carrier],
    effects: list[Effect],
    ports: list[Port],
    converters: list[Converter] | None = None,
    storages: list[Storage] | None = None,
    dt: float | list[float] | None = None,
    periods: list[int] | Index | None = None,
    period_weights: list[float] | None = None,
) -> Self

Build ModelData from element objects.

Parameters:

Name Type Description Default

timesteps

Timesteps

Time index for the optimization horizon.

required

carriers

list[Carrier]

Carrier declarations.

required

effects

list[Effect]

Effects to track.

required

ports

list[Port]

System boundary ports.

required

converters

list[Converter] | None

Linear converters.

None

storages

list[Storage] | None

Energy storages.

None

dt

float | list[float] | None

Timestep duration in hours. Auto-derived if None.

None

periods

list[int] | Index | None

Integer period labels for multi-period optimization.

None

period_weights

list[float] | None

Explicit weights per period. Inferred from gaps if None.

None
Source code in src/fluxopt/model_data.py
@classmethod
def build(
    cls,
    timesteps: Timesteps,
    carriers: list[Carrier],
    effects: list[Effect],
    ports: list[Port],
    converters: list[Converter] | None = None,
    storages: list[Storage] | None = None,
    dt: float | list[float] | None = None,
    periods: list[int] | pd.Index | None = None,
    period_weights: list[float] | None = None,
) -> Self:
    """Build ModelData from element objects.

    Args:
        timesteps: Time index for the optimization horizon.
        carriers: Carrier declarations.
        effects: Effects to track.
        ports: System boundary ports.
        converters: Linear converters.
        storages: Energy storages.
        dt: Timestep duration in hours. Auto-derived if None.
        periods: Integer period labels for multi-period optimization.
        period_weights: Explicit weights per period. Inferred from gaps if None.
    """
    from fluxopt.elements import PENALTY_EFFECT_ID, Effect
    from fluxopt.types import compute_dt as _compute_dt

    converters = converters or []
    stor_list = storages or []
    time = normalize_timesteps(timesteps)
    dt_da = _compute_dt(time, dt)

    if not any(e.id == PENALTY_EFFECT_ID for e in effects):
        effects = [*effects, Effect(PENALTY_EFFECT_ID)]

    flows, carrier_coeff = _collect_flows(ports, converters, stor_list)
    _validate_system(effects, ports, converters, stor_list, flows, carriers)

    dims = Dims.build(time, dt_da, periods=periods, period_weights=period_weights)

    # Scalar dt for prior duration computation (use first timestep)
    dt_scalar = float(dims.dt.values[0])
    period_idx = pd.Index(dims.period.values) if dims.period is not None else None

    comp_status_items: list[tuple[str, Status, list[str]]] = [
        (s.id, s.status, [s.charging.id, s.discharging.id]) for s in stor_list if s.status is not None
    ]
    comp_status_items.extend(
        (c.id, c.conversion.status, [f.id for f in (*c.inputs, *c.outputs)])
        for c in converters
        if c.conversion is not None and c.conversion.status is not None
    )

    flows_data = FlowsData.build(
        flows,
        time,
        effects,
        dt=dt_scalar,
        period=period_idx,
        component_status_items=comp_status_items,
    )
    carriers_data = CarriersData.build(carriers, flows, carrier_coeff)
    converters_data = ConvertersData.build(converters, time)
    effects_data = EffectsData.build(effects, time, period=period_idx)
    storages_data = StoragesData.build(stor_list, time, dims.dt, effects, period=period_idx)
    piecewise_data = PiecewiseData.build(converters, time)

    return cls(
        flows=flows_data,
        carriers=carriers_data,
        converters=converters_data,
        effects=effects_data,
        storages=storages_data,
        dims=dims,
        piecewise=piecewise_data,
    )

Result dataclass

Result(
    solution: Dataset,
    data: ModelData,
    duals: Dataset = Dataset(),
    contributions: Dataset | None = None,
)

Optimization result with solution variables and model data.

Provides access to flow rates, storage levels, effect totals, and investment decisions. Key properties::

result.objective  # scalar objective value
result.flow_rates  # (flow, time) DataArray
result.flow_rate('id')  # single flow time series
result.storage_levels  # (storage, time) DataArray
result.effect_totals  # (effect,) DataArray
result.effects_temporal  # (effect, time) DataArray
result.effects_lump  # (effect,) DataArray
result.sizes  # (flow,) DataArray — invested sizes
result.storage_capacities  # (storage,) DataArray

Per-contributor effect breakdown is available via result.stats.

Parameters:

Name Type Description Default

solution

Dataset

Solved variable values as xr.Dataset.

required

data

ModelData

ModelData used to build the optimization.

required

duals

Dataset

Dual values (shadow prices) from the solver.

Dataset()

contributions

Dataset | None

Cached direct per-contributor effect breakdown (no cross-effect propagation). Surfaced via result.stats.effect_contributions_direct; result.stats.effect_contributions applies Leontief on top.

None

Methods:

Name Description
flow_rate

Get flow rate time series for a single flow.

storage_level

Get charge state time series for a single storage.

to_netcdf

Write solution and model data to NetCDF.

from_netcdf

Read a Result from a NetCDF file.

from_model

Extract solution from a solved linopy model.

Attributes:

Name Type Description
objective float

Objective function value.

flow_rates DataArray

All flow rates as (flow, time) DataArray.

storage_levels DataArray

All storage levels as (storage, time) DataArray.

sizes DataArray

Optimized flow sizes as (flow,) DataArray.

storage_capacities DataArray

Optimized storage capacities as (storage,) DataArray.

effect_totals DataArray

Total effect values as (effect,) DataArray.

effects_temporal DataArray

Per-timestep effect values as (effect, time) DataArray.

effects_lump DataArray

Non-temporal effect values as (effect,) DataArray.

topology dict[Literal['carriers', 'converters'], dict[str, dict[str, list[str]]]]

Carrier and converter connectivity derived from model data.

stats StatsAccessor

Post-processing statistics accessor.

plot PlotAccessor

Plotting accessor (requires fluxopt-plot).

objective property

objective: float

Objective function value.

flow_rates property

flow_rates: DataArray

All flow rates as (flow, time) DataArray.

storage_levels property

storage_levels: DataArray

All storage levels as (storage, time) DataArray.

sizes property

sizes: DataArray

Optimized flow sizes as (flow,) DataArray.

storage_capacities property

storage_capacities: DataArray

Optimized storage capacities as (storage,) DataArray.

effect_totals property

effect_totals: DataArray

Total effect values as (effect,) DataArray.

effects_temporal property

effects_temporal: DataArray

Per-timestep effect values as (effect, time) DataArray.

effects_lump property

effects_lump: DataArray

Non-temporal effect values as (effect,) DataArray.

topology cached property

topology: dict[
    Literal['carriers', 'converters'], dict[str, dict[str, list[str]]]
]

Carrier and converter connectivity derived from model data.

Returns a dict with carriers and converters keys, each mapping element ids to their inputs (flows that produce into the element) and outputs (flows that consume from it).

stats cached property

Post-processing statistics accessor.

plot cached property

plot: PlotAccessor

Plotting accessor (requires fluxopt-plot).

flow_rate

flow_rate(flow_id: str) -> DataArray

Get flow rate time series for a single flow.

Parameters:

Name Type Description Default

flow_id

str

Qualified flow id.

required
Source code in src/fluxopt/results.py
def flow_rate(self, flow_id: str) -> xr.DataArray:
    """Get flow rate time series for a single flow.

    Args:
        flow_id: Qualified flow id.
    """
    return self.flow_rates.sel(flow=flow_id)

storage_level

storage_level(storage_id: str) -> DataArray

Get charge state time series for a single storage.

Parameters:

Name Type Description Default

storage_id

str

Storage id.

required
Source code in src/fluxopt/results.py
def storage_level(self, storage_id: str) -> xr.DataArray:
    """Get charge state time series for a single storage.

    Args:
        storage_id: Storage id.
    """
    return self.storage_levels.sel(storage=storage_id)

to_netcdf

to_netcdf(path: str | Path) -> None

Write solution and model data to NetCDF.

Parameters:

Name Type Description Default

path

str | Path

Output file path.

required
Source code in src/fluxopt/results.py
def to_netcdf(self, path: str | Path) -> None:
    """Write solution and model data to NetCDF.

    Args:
        path: Output file path.
    """
    p = Path(path)
    self.solution.to_netcdf(p, mode='w', engine='netcdf4')
    self.data.to_netcdf(p)
    if self.contributions is not None:
        self.contributions.to_netcdf(p, mode='a', group='contributions', engine='netcdf4')

from_netcdf classmethod

from_netcdf(path: str | Path) -> Result

Read a Result from a NetCDF file.

Parameters:

Name Type Description Default

path

str | Path

Input file path.

required
Source code in src/fluxopt/results.py
@classmethod
def from_netcdf(cls, path: str | Path) -> Result:
    """Read a Result from a NetCDF file.

    Args:
        path: Input file path.
    """
    from fluxopt.model_data import ModelData

    p = Path(path)
    solution = xr.load_dataset(p, engine='netcdf4')
    data = ModelData.from_netcdf(p)

    try:
        contributions = xr.load_dataset(p, group='contributions', engine='netcdf4')
    except OSError:
        contributions = None
        import warnings

        warnings.warn(
            f"NetCDF file {p} has no 'contributions' group; per-contributor effect "
            'breakdown will be re-derived from solution + ModelData on first access. '
            'Results may differ from the original solve if the contribution-decomposition '
            'logic has changed since the file was written. Re-save the Result to refresh '
            'the cached breakdown.',
            stacklevel=2,
        )

    return cls(solution=solution, data=data, contributions=contributions)

from_model classmethod

from_model(model: FlowSystem) -> Result

Extract solution from a solved linopy model.

Parameters:

Name Type Description Default

model

FlowSystem

Solved FlowSystem instance.

required
Source code in src/fluxopt/results.py
@classmethod
def from_model(cls, model: FlowSystem) -> Result:
    """Extract solution from a solved linopy model.

    Args:
        model: Solved FlowSystem instance.
    """
    sol_vars: dict[str, xr.DataArray] = {
        'flow--rate': model.flow_rate.solution,
        'effect--total': model.effect_total.solution,
        'effect--temporal': model.effect_temporal.solution,
        'effect--lump': model.effect_lump.solution,
    }

    if model.storage_level is not None:
        sol_vars['storage--level'] = model.storage_level.solution
    if model.flow_size is not None:
        sol_vars['flow--size'] = model.flow_size.solution
    if model.flow_size_indicator is not None:
        sol_vars['flow--size_indicator'] = model.flow_size_indicator.solution
    if model.storage_capacity is not None:
        sol_vars['storage--capacity'] = model.storage_capacity.solution
    if model.storage_capacity_indicator is not None:
        sol_vars['storage--size_indicator'] = model.storage_capacity_indicator.solution
    if model.invest_size is not None:
        sol_vars['invest--size'] = model.invest_size.solution
    if model.invest_build is not None:
        sol_vars['invest--build'] = model.invest_build.solution
    if model.invest_active is not None:
        sol_vars['invest--active'] = model.invest_active.solution
    if model.invest_size_at_build is not None:
        sol_vars['invest--size_at_build'] = model.invest_size_at_build.solution
    if model.flow_on is not None:
        sol_vars['flow--on'] = model.flow_on.solution
    if model.flow_startup is not None:
        sol_vars['flow--startup'] = model.flow_startup.solution
    if model.flow_shutdown is not None:
        sol_vars['flow--shutdown'] = model.flow_shutdown.solution
    if model.component_on is not None:
        sol_vars['component--on'] = model.component_on.solution
    if model.component_startup is not None:
        sol_vars['component--startup'] = model.component_startup.solution
    if model.component_shutdown is not None:
        sol_vars['component--shutdown'] = model.component_shutdown.solution

    # Piecewise auxiliary variables (from linopy.add_piecewise_formulation).
    # Stored under their linopy-generated names so they survive IO roundtrip.
    for formulation in model._piecewise.values():
        for var_name in formulation.variable_names:
            if var_name not in sol_vars:
                sol_vars[var_name] = model.m.variables[var_name].solution

    # Include custom variables added after build()
    for var_name in model.m.variables:
        if var_name not in model._builtin_var_names and var_name not in sol_vars:
            sol_vars[var_name] = model.m.variables[var_name].solution

    raw = model.m.objective.value
    obj_val = float(raw) if raw is not None else 0.0

    solution = xr.Dataset(sol_vars, attrs={'objective': obj_val})
    duals = model.m.dual

    from fluxopt.contributions import _with_cross_effects, compute_effect_contributions

    try:
        # Cache the direct (no cross-effect) view — it's the primitive both
        # accessors build on. effect_contributions applies Leontief on top
        # via _with_cross_effects, which is cheap relative to _compute_direct.
        contributions = compute_effect_contributions(solution, model.data, cross_effects=False)
        # Sanity-check at solve time: applying cross-effects must reproduce
        # the solver's effect--total. Result discarded; caches stay direct.
        _with_cross_effects(contributions, model.data, solution)
    except Exception as exc:
        import warnings

        warnings.warn(
            f'Failed to compute effect contributions during solve ({exc!r}); '
            'result.contributions will be None (re-derive via result.stats.effect_contributions)',
            stacklevel=2,
        )
        contributions = None

    return cls(solution=solution, data=model.data, duals=duals, contributions=contributions)

IdList

IdList(items: Iterable[T])

Frozen, ordered container with access by id (str) or position (int).

Supports concatenation via +.

Parameters:

Name Type Description Default

items

Iterable[T]

Elements to store. Must have unique ids.

required

Raises:

Type Description
ValueError

On duplicate ids.

Source code in src/fluxopt/types.py
def __init__(self, items: Iterable[T]) -> None:
    self._items: tuple[T, ...] = tuple(items)
    self._by_id: dict[str, T] = {}
    for item in self._items:
        if item.id in self._by_id:
            raise ValueError(f"Duplicate id: '{item.id}'")
        self._by_id[item.id] = item

as_dataarray

as_dataarray(
    value: Variate,
    coords: Mapping[str, Any],
    *,
    name: str = 'value',
    broadcast: bool = True,
) -> DataArray

Convert a Variate to a DataArray aligned to given coordinates.

Pipeline: convert → validate dims → validate coord values → broadcast.

See :data:Variate for accepted inputs. Pandas inputs (Series, DataFrame) follow the same convention as linopy.as_dataarray: the axis name attribute selects the corresponding target dim. For ndarray/list, the dim is selected by length (must be unambiguous). For DataArray, dims must be a subset of coords and coord values must match exactly — alignment errors are surfaced loudly, not silently masked.

Parameters:

Name Type Description Default

value

Variate

Scalar, list, ndarray, Series, DataFrame, or DataArray.

required

coords

Mapping[str, Any]

Target coordinates, e.g. {"time": idx, "period": pidx}. Used both as the reach declaration and as alignment targets.

required

name

str

Name for the resulting DataArray.

'value'

broadcast

bool

Expand result to span all dimensions in coords.

True
Source code in src/fluxopt/types.py
def as_dataarray(
    value: Variate,
    coords: Mapping[str, Any],
    *,
    name: str = 'value',
    broadcast: bool = True,
) -> xr.DataArray:
    """Convert a Variate to a DataArray aligned to given coordinates.

    Pipeline: ``convert → validate dims → validate coord values → broadcast``.

    See :data:`Variate` for accepted inputs. Pandas inputs (``Series``,
    ``DataFrame``) follow the same convention as ``linopy.as_dataarray``: the
    axis ``name`` attribute selects the corresponding target dim. For
    ``ndarray``/``list``, the dim is selected by length (must be unambiguous).
    For ``DataArray``, dims must be a subset of *coords* and coord values must
    match exactly — alignment errors are surfaced loudly, not silently masked.

    Args:
        value: Scalar, list, ndarray, Series, DataFrame, or DataArray.
        coords: Target coordinates, e.g. ``{"time": idx, "period": pidx}``.
            Used both as the reach declaration and as alignment targets.
        name: Name for the resulting DataArray.
        broadcast: Expand result to span all dimensions in *coords*.
    """
    coord_idx = {k: v if isinstance(v, pd.Index) else pd.Index(v) for k, v in coords.items()}

    # --- scalar: 0-dim unless broadcast ---
    if isinstance(value, (int, float)):
        if not broadcast:
            return xr.DataArray(float(value), name=name)
        shape = tuple(len(v) for v in coord_idx.values())
        return xr.DataArray(
            np.full(shape, float(value)),
            dims=list(coord_idx),
            coords=coord_idx,
            name=name,
        )

    # --- 1) Convert to DataArray ---
    da: xr.DataArray
    if isinstance(value, xr.DataArray):
        da = value
    elif isinstance(value, (pd.Series, pd.DataFrame)):
        # Mirror linopy: pandas axes already carry coords; use axis.name as dim.
        # Fall back to length-matching only when no axis is named.
        named = [a.name for a in value.axes if a.name is not None]
        if len(named) == value.ndim:
            da = xr.DataArray(value)
        elif value.ndim == 1 and not named:
            return _from_unnamed_1d(np.asarray(value.values, dtype=float), coord_idx, name, broadcast)
        else:
            raise ValueError(
                f'{type(value).__name__} requires axis.name set on every axis '
                f'(got {[a.name for a in value.axes]!r}). '
                f"Set e.g. df.index.name='time', df.columns.name='period'."
            )
    elif isinstance(value, np.ndarray):
        if value.ndim != 1:
            raise ValueError(
                f'np.ndarray must be 1-D (got ndim={value.ndim}); pass an xr.DataArray '
                f'or pd.DataFrame with named axes for higher-dim inputs.'
            )
        return _from_unnamed_1d(value, coord_idx, name, broadcast)
    elif isinstance(value, list):
        return _from_unnamed_1d(np.asarray(value, dtype=float), coord_idx, name, broadcast)
    else:
        raise TypeError(f'Unsupported Variate type: {type(value)}')

    # --- 2) Validate dims are a subset of the target ---
    foreign = [str(d) for d in da.dims if d not in coord_idx]
    if foreign:
        raise ValueError(
            f'{type(value).__name__} has dims {foreign} not in target coords {list(coord_idx)}. '
            f'Rename before calling as_dataarray().'
        )

    # --- 3) Validate coord values match exactly (close the alignment gap) ---
    for d in da.dims:
        dim_name = str(d)
        if d in da.coords and not pd.Index(da.coords[d].values).equals(coord_idx[dim_name]):
            raise ValueError(
                f'Coord mismatch on dim {dim_name!r}: input coord does not equal target. '
                f"Use the same index as the model's {dim_name}."
            )

    da = da.rename(name)
    if broadcast:
        for dim, idx in coord_idx.items():
            if dim not in da.dims:
                da = da.expand_dims({dim: idx})
        da = da.transpose(*coord_idx)
    return da

optimize

optimize(
    timesteps: Timesteps,
    carriers: list[Carrier],
    effects: list[Effect],
    ports: list[Port],
    objective_effects: str | list[str],
    converters: list[Converter] | None = None,
    storages: list[Storage] | None = None,
    dt: float | list[float] | None = None,
    periods: list[int] | None = None,
    period_weights: list[float] | None = None,
    solver: str = 'highs',
    customize: Callable[[FlowSystem], None] | None = None,
    **kwargs: Any,
) -> Result

Build data, build model, optimize, return results.

Parameters:

Name Type Description Default

timesteps

Timesteps

Time index for the optimization horizon.

required

carriers

list[Carrier]

Carrier declarations.

required

effects

list[Effect]

Effects to track (costs, emissions, etc.).

required

ports

list[Port]

System boundary ports with imports/exports.

required

objective_effects

str | list[str]

Effect name(s) to minimize. Sum of named effect totals.

required

converters

list[Converter] | None

Linear converters between carriers.

None

storages

list[Storage] | None

Energy storages.

None

dt

float | list[float] | None

Timestep duration in hours. Auto-derived if None.

None

periods

list[int] | None

Integer period labels for multi-period optimization.

None

period_weights

list[float] | None

Explicit weights per period. Inferred from gaps if None.

None

solver

str

Solver backend name.

'highs'

customize

Callable[[FlowSystem], None] | None

Optional callback to modify the linopy model between build and solve. Receives the built FlowSystem; use model.m to add variables/constraints.

None

**kwargs

Any

Passed through to linopy.Model.solve().

{}
Source code in src/fluxopt/__init__.py
def optimize(
    timesteps: Timesteps,
    carriers: list[Carrier],
    effects: list[Effect],
    ports: list[Port],
    objective_effects: str | list[str],
    converters: list[Converter] | None = None,
    storages: list[Storage] | None = None,
    dt: float | list[float] | None = None,
    periods: list[int] | None = None,
    period_weights: list[float] | None = None,
    solver: str = 'highs',
    customize: Callable[[FlowSystem], None] | None = None,
    **kwargs: Any,
) -> Result:
    """Build data, build model, optimize, return results.

    Args:
        timesteps: Time index for the optimization horizon.
        carriers: Carrier declarations.
        effects: Effects to track (costs, emissions, etc.).
        ports: System boundary ports with imports/exports.
        objective_effects: Effect name(s) to minimize. Sum of named effect totals.
        converters: Linear converters between carriers.
        storages: Energy storages.
        dt: Timestep duration in hours. Auto-derived if None.
        periods: Integer period labels for multi-period optimization.
        period_weights: Explicit weights per period. Inferred from gaps if None.
        solver: Solver backend name.
        customize: Optional callback to modify the linopy model between build and solve.
            Receives the built FlowSystem; use ``model.m`` to add variables/constraints.
        **kwargs: Passed through to ``linopy.Model.solve()``.
    """
    data = ModelData.build(
        timesteps,
        carriers,
        effects,
        ports,
        converters,
        storages,
        dt,
        periods=periods,
        period_weights=period_weights,
    )
    model = FlowSystem(data)
    return model.optimize(objective_effects=objective_effects, customize=customize, solver=solver, **kwargs)