# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from __future__ import annotations
from typing import List, Optional, Tuple
import numpy as np
from VeraGridEngine.Devices.Dynamic.var_factory import VarFactory
from VeraGridEngine.Utils.Symbolic.block import Block
from VeraGridEngine.enumerations import ParamPowerFlowReferenceType
from VeraGridEngine.basic_structures import Vec
from VeraGridEngine.Utils.Symbolic.symbolic import (Var, Const, Expr, BinOp, CmpOp, Comparison, heaviside, hard_sat, expression2numba, get_expression_vars)
from VeraGridEngine.Utils.Symbolic.symbolic_ml import mti_hard_sat, ml_hard_sat, ml_heaviside
[docs]
def tf_to_block(var_factory: VarFactory,
num: List[Var | float],
den: List[Var | float],
x: Var | Expr,
y: Var = None,
create_state: bool = False,
name: Optional[str] = '') -> Tuple[Block, Var]:
"""
"transform definition" to block
num: list of numerator coefficients [b0, b1, ..., bm]
den: list of denominator coefficients [a0, a1, ..., an]
x: sympy symbol for input
y: sympy function of t for output
"""
if len(num) > len(den):
raise ValueError("Transfer function is improper: numerator degree > denominator degree.")
if y is None:
y = var_factory.add_var('y_' + name)
aux_eqs: List[Expr] = list()
aux_vars: List[Expr] = list()
init_eqs = dict()
diff_init_eqs = dict() # ADDED
x_save = x # Initialize x_save to avoid unbound variable error
# check if is an expression
if not isinstance(x, Var):
u = var_factory.add_var('u_' + name)
aux_eqs.append((u - x).simplify())
aux_vars.append(u)
init_eqs[u] = x
x = u
diff_vars_x = [x]
diff_vars_y = [y]
diff_vars = list()
state_eqs = list()
state_vars = list()
base_var = x
for i in range(1, len(num)):
if i == 1 and create_state:
x_new = var_factory.add_var(f"x_{i}_" + x.name)
state_vars.append(x_new)
state_eqs.append(x_save) # x is already a Var
diff_vars_x.append(x_new)
base_var = x_new
init_eqs[x_new] = Const(0.0)
else:
if base_var.diff_var is None:
new_diff = var_factory.add_diff_var(name=f'dt_{i}_' + x.name, base_var=base_var)
diff_vars.append(new_diff)
diff_init_eqs[new_diff] = Const(0.0) # ADDED
diff_vars_x.append(base_var.diff_var)
base_var = base_var.diff_var
base_var = y
for i in range(1, len(den)):
if base_var.diff_var is None:
new_diff = var_factory.add_diff_var(name=f'dt_{i}_' + y.name, base_var=base_var)
diff_vars.append(new_diff)
diff_init_eqs[new_diff] = Const(0.0) # ADDED
diff_vars_y.append(base_var.diff_var)
base_var = base_var.diff_var
# Create the diff equation
rhs = np.array(diff_vars_x) @ np.array(num)
lhs = np.array(diff_vars_y) @ np.array(den)
# Pure integrator-style blocks (den[0] == 0) need a derivative init that
# matches the input, otherwise the first consistency check leaves a residual.
if len(den) > 1 and den[0] == 0 and y.base_var is not None:
diff_init_eqs[y.diff_var] = x / den[1]
block = Block()
block.algebraic_vars = [y] + aux_vars
block.algebraic_eqs = [lhs - rhs] + aux_eqs
block.state_eqs = state_eqs
block.state_vars = state_vars
block.diff_vars = diff_vars
block.in_vars = [x] # Input is x
block.out_vars = [y] # Output is y
block.init_eqs = init_eqs
block.diff_init_eqs = diff_init_eqs # ADDED
return block, y
[docs]
def tf_to_diffblock_with_output(
var_factory: VarFactory,
num: List[Var | float],
den: List[Var | float],
x: Var | Expr,
y: Var = None,
create_state: bool = False,
name: Optional[str] = ''):
"""
num: list of numerator coefficients [b0, b1, ..., bm]
den: list of denominator coefficients [a0, a1, ..., an]
x: sympy symbol for input
y: sympy function of t for output
"""
if len(num) > len(den):
raise ValueError("Transfer function is improper: numerator degree > denominator degree.")
if (isinstance(num[-1], float) and num[-1] == 0) or (isinstance(den[-1], float) and den[-1] == 0):
raise ValueError("Leading coefficient of numerator or denominator cannot be zero.")
if y is None:
y = var_factory.add_var('y_' + name)
aux_eqs: List[Var | Expr] = []
aux_vars: List[Var | Expr] = []
x_save = None # Initialize x_save to avoid unbound variable error
# check if is an expression
if not isinstance(x, Var):
u = var_factory.add_var('u_' + name)
aux_eqs.append((u - x).simplify())
aux_vars.append(u)
x_save = x
x = u
diff_vars_x = [x]
diff_vars_y = [y]
diff_vars = list()
state_eqs = list()
state_vars = list()
init_eqs = dict()
diff_init_eqs = dict() # ADDED
base_var = x
for i in range(1, len(num)):
if i == 1 and create_state:
x_new = var_factory.add_var(f"x_{i}_" + x.name)
state_vars.append(x_new)
# x_save only exists if x was an expression (not a Var)
if 'x_save' in locals():
state_eqs.append(x_save)
else:
state_eqs.append(x) # x is already a Var
diff_vars_x.append(x_new)
base_var = x_new
init_eqs[x_new] = Const(0.0)
else:
if base_var.diff_var is None:
new_diff = var_factory.add_diff_var(name=f'dt_{i}_' + x.name, base_var=base_var)
diff_vars.append(new_diff)
diff_init_eqs[new_diff] = Const(0.0) # ADDED
diff_vars_x.append(base_var.diff_var)
base_var = base_var.diff_var
base_var = y
for i in range(1, len(den)):
if base_var.diff_var is None:
new_diff = var_factory.add_diff_var(name=f'dt_{i}_' + y.name, base_var=base_var)
diff_vars.append(new_diff)
diff_init_eqs[new_diff] = Const(0.0) # ADDED
diff_vars_y.append(base_var.diff_var)
base_var = base_var.diff_var
# Create the diff equation
rhs: Vec = np.array(diff_vars_x) @ np.array(num)
lhs: Vec = np.array(diff_vars_y) @ np.array(den)
block = Block()
block.state_eqs = state_eqs
block.state_vars = state_vars
block.algebraic_vars = [y] + aux_vars
block.algebraic_eqs = [lhs - rhs] + aux_eqs
block.diff_vars = diff_vars
block.init_eqs = init_eqs
block.diff_init_eqs = diff_init_eqs # ADDED
return block, y, x
[docs]
def tf_to_block_with_states(
var_factory: VarFactory,
num: List[Var | float],
den: List[Var | float],
x: Var | Expr,
y: Var = None, name: Optional[str] = ''):
"""
num: numerator coefficients [b0,...,bm]
den: denominator coefficients [a0,...,an], with an != 0
u: input Var
y: output Var
"""
if len(num) > len(den):
raise ValueError("Transfer function is improper: numerator degree > denominator degree.")
if num[-1] == 0 or den[-1] == 0:
raise ValueError("Leading coefficient of cannot be zero.")
if y is None:
y = var_factory.add_var('y_' + name)
aux_eqs = []
aux_vars = []
diff_init_eqs = dict() # ADDED
# check if x is a expression
if not isinstance(x, Var):
u = var_factory.add_var('u_' + name)
aux_eqs.append(u - x)
aux_vars.append(u)
x = u
# Normalize
den = [c / den[-1] for c in den]
num = [c / den[-1] for c in num]
# States
x_states = [x]
y_states = [y]
y_states.extend([var_factory.add_var(f"y{i}") for i in range(1, len(den))]) # y1,...,yn
x_states.extend([var_factory.add_var(f"x{i}") for i in range(1, len(num))]) # x1,...,xm
# Differential equations (canonical form)
diff_eqs = list()
diff_vars = list()
for i in range(len(y_states) - 1):
if y_states[i].diff_var is None:
dy = var_factory.add_diff_var(f"d_{y_states[i].name}", base_var=y_states[i])
diff_vars.append(dy)
diff_init_eqs[dy] = Const(0.0) # ADDED
diff_eqs.append(y_states[i + 1] - y_states[i].diff_var)
for i in range(len(num) - 1):
if x_states[i].diff_var is None:
dx = var_factory.add_diff_var(f"d^{i}_{x_states[i].name}", base_var=x_states[i])
diff_vars.append(dx)
diff_init_eqs[dx] = Const(0.0) # ADDED
diff_eqs.append(x_states[i + 1] - x_states[i].diff_var)
# Last equation: linear combination
last_eq = (
sum(den[i] * y_states[i] for i in range(len(y_states)))
- sum(num[i] * x_states[i] for i in range(len(x_states)))
)
diff_eqs.append(last_eq)
block = Block(
algebraic_vars=y_states + x_states[1:] + aux_vars,
algebraic_eqs=diff_eqs + aux_eqs,
)
block.diff_vars = diff_vars
block.diff_init_eqs = diff_init_eqs # ADDED
block.name = 'TF'
return block, y_states[0]
[docs]
def tf_to_block2(
var_factory: VarFactory,
num: List[Var | float],
den: List[Var | float],
x: Var | float,
y: Var = None,
name:str = ''):
"""
num: numerator coefficients [b0,...,bm]
den: denominator coefficients [a0,...,an], with an != 0
u: input Expr or Var
y: output Var
"""
if len(num) >= len(den):
raise ValueError("Transfer function is improper: numerator degree > denominator degree.")
if num[-1] == 0 or den[-1] == 0:
raise ValueError("Leading coefficient of cannot be zero.")
aux_eqs: List[Expr] = list()
aux_vars: List[Var] = list()
# check if xis a expression
if not isinstance(x, Var):
u = var_factory.add_var('u')
aux_eqs.append(u - x)
aux_vars.append(u)
x = u
# Normalize
den = [c / den[-1] for c in den]
num = [c / den[-1] for c in num]
order = len(den) # system order
x_states: List[Var] = [x] # x0
x_states.extend([var_factory.add_var(f"x{i}") for i in range(1, order + 1)]) # x1,...,xn
# Differential equations (canonical form)
state_eqs = list()
state_vars = list()
for i in range(1, order):
state_eqs.append(x_states[i])
state_vars.append(x_states[i + 1])
y_states: List[Var] = [var_factory.add_var(f"y{i}") for i in range(len(num) + 1)] # x1,...,xn
for i in range(len(num)):
state_eqs.append(y_states[i])
state_vars.append(y_states[i + 1])
# Last equation: linear combination
last_eq = (
sum(den[i] * y_states[i] for i in range(order))
- sum(num[i] * x_states[i] for i in range(len(num)))
)
aux_eqs.append(last_eq)
block = Block(
state_eqs=state_eqs,
state_vars=state_vars,
algebraic_eqs=aux_eqs,
# TODO: this is highly suspicious (better add vars to aux_vars)
algebraic_vars=x_states[0] + y_states[0] + aux_vars,
)
return block, y_states[0]
[docs]
def to_implicit(block: Block, vfactory: VarFactory) -> Block:
"""
Convert a block with explicit state equations to implicit form.
For each state variable x with equation dx/dt = f(x, y):
- Create a new algebraic variable dt_x representing the derivative
- Add dt_x to algebraic_vars
- Add equation: diff(x) - f(x, y) = 0 to algebraic_eqs
- Move x from state_vars to algebraic_vars
- Remove the state equation from state_eqs
This transforms the explicit ODE form into DAE implicit form suitable
for solvers that expect algebraic equations.
"""
# Process each state variable and its corresponding state equation
# We iterate backwards to safely modify lists while iterating
for i, state_var in enumerate(block.state_vars):
state_eq = block.state_eqs[i]
# Create a new algebraic variable for the derivative
if state_var.diff_var == None:
dt_var = vfactory.add_diff_var(f"dt_{state_var.name}", base_var=state_var)
block.diff_vars.append(dt_var)
else:
dt_var = state_var.diff_var
implicit_eq = dt_var - state_eq
block.algebraic_vars.append(state_var)
block.algebraic_eqs.append(implicit_eq)
#block.init_eqs[dt_var] = block.init_eqs[state_var]
# The state_vars become algebraic_vars since they now appear in algebraic equations
block.state_vars = list()
block.state_eqs = list()
# Recursively process children
for b in block.children:
to_implicit(b, vfactory)
return block
def _extract_rhs_from_implicit_eq(eq: Expr, diff_var: Var) -> Expr | None:
"""
Try to recover rhs from an implicit algebraic equation of the form:
diff_var - rhs = 0
Returns rhs when recognized, otherwise None.
"""
if isinstance(eq, BinOp):
if eq.op == "-":
if isinstance(eq.left, Var) and eq.left.uid == diff_var.uid:
return eq.right
if isinstance(eq.right, Var) and eq.right.uid == diff_var.uid:
return eq.left
elif eq.op == "+":
if isinstance(eq.left, Var) and eq.left.uid == diff_var.uid:
return (-eq.right).simplify()
if isinstance(eq.right, Var) and eq.right.uid == diff_var.uid:
return (-eq.left).simplify()
return None
[docs]
def to_explicit(block: Block, vfactory: VarFactory) -> Block:
"""
Convert an implicit DAE-like block back to explicit state equations.
For each diff var `dt_x` in `block.diff_vars`, look for an algebraic equation
equivalent to `dt_x - f(...) = 0`. When found:
- create/ensure state equation `dx/dt = f(...)` for base var `x`
- remove that implicit algebraic equation
- substitute `dt_x` by `f(...)` in remaining algebraic equations
- remove all recovered diff vars from `block.diff_vars`
Recurses through child blocks.
"""
block.unify_blocks()
for i, diff_var in enumerate(block.diff_vars):
new_diff_var = vfactory.add_var(f"dt_{diff_var.base_var.name}")
if diff_var.base_var not in block.state_vars:
block.state_vars.append(diff_var.base_var)
block.state_eqs.append(diff_var)
block.algebraic_vars.remove(diff_var.base_var)
else:
state_eq = block.state_eqs[block.state_vars.index(diff_var.base_var)]
block.algebraic_eqs.append(new_diff_var - state_eq)
block.algebraic_vars.append(new_diff_var)
block.update_equations(diff_var, new_diff_var)
block.diff_vars = list()
for child in block.children:
to_explicit(child, vfactory)
return block
[docs]
def tf_to_diffblock_with_antiwindup(
var_factory: VarFactory,
num: List[Var | float],
den: List[Var | float],
x: Var,
y: Var = None,
name: Optional[str] = '',
sat_min: Expr = Const(float(-1e6)),
sat_max: Expr = Const(float(1e6)),
multilinear=False,
PI: bool = False):
"""
num: numerator coefficients [b0, ..., bm]
den: denominator coefficients [a0, ..., an]
x: input (Var or Expr)
y: output Var
sat_min, sat_max: saturation limits for anti-windup
"""
if len(num) > len(den):
raise ValueError("Transfer function is improper: numerator degree > denominator degree.")
if num[-1] == 0 or den[-1] == 0:
raise ValueError("Leading coefficient cannot be zero.")
if y is None:
y = var_factory.add_var('y_' + name)
aux_eqs = []
aux_vars = []
diff_init_eqs = dict() # ADDED
# If x is an expression, wrap it in a Var
if not isinstance(x, Var):
u = var_factory.add_var('u_' + name)
aux_eqs.append((u - x).simplify())
aux_vars.append(u)
x = u
diff_vars_x = [x]
diff_vars_y = [y]
diff_vars = []
# Derivative chain for x
base_var = x
for i in range(1, len(num)):
if base_var.diff_var is None:
new_diff = var_factory.add_diff_var(name=f'dt_{i}_' + x.name, base_var=base_var)
diff_vars.append(new_diff)
diff_init_eqs[new_diff] = Const(0.0) # ADDED
diff_vars_x.append(x.diff_var)
base_var = x.diff_var
# Derivative chain for y
base_var = y
for i in range(1, len(den)):
if base_var.diff_var is None:
new_diff = var_factory.add_diff_var(name=f'dt_{i}_' + y.name, base_var=base_var)
diff_vars.append(new_diff)
diff_init_eqs[new_diff] = Const(0.0) # ADDED
diff_vars_y.append(y.diff_var)
base_var = y.diff_var
is_simple_integrator = (
len(num) == 1
and len(den) == 2
and den[0] == 0
)
if is_simple_integrator:
# 1/(sT): dy/dt = (num[0]/den[1]) * u
f = (num[0] * diff_vars_x[0]) / den[1]
else:
# Generic TF fallback
rhs = Const(0.0)
lhs_wo_dy = Const(0.0)
if not PI:
for i, term in enumerate(np.array(den) * np.array(diff_vars_y)):
if i != 1:
lhs_wo_dy += term
for term in np.array(num) * np.array(diff_vars_x):
rhs += term
f = (rhs - lhs_wo_dy) / Const(den[1])
# Integrator with non-windup limiter (Figure E.2 behavior):
# - freeze when already at upper bound and f > 0
# - freeze when already at lower bound and f < 0
# - otherwise integrate with dy/dt = f
if not multilinear:
at_or_above_min = heaviside(y - sat_min)
at_or_below_max = heaviside(sat_max - y)
f_pos = heaviside(f)
f_neg = Const(1.0) - f_pos
allow_integrate = heaviside(
at_or_above_min * at_or_below_max
+ (Const(1.0) - at_or_below_max) * f_neg
+ (Const(1.0) - at_or_above_min) * f_pos
)
eq_main = (diff_vars_y[1] - allow_integrate * f).simplify()
algebraic_vars = [y] + aux_vars
algebraic_eqs = [eq_main] + aux_eqs
init_eqs = {}
else:
sat_block, y_sat = mti_hard_sat(
vf=var_factory,
u=y,
ul=sat_min,
uu=sat_max,
yl=sat_min,
yu=sat_max,
name=f"{name}_aw_sat",
)
hv_up_block, at_upper = ml_heaviside(var_factory, y - sat_max, name=f"{name}_aw_up")
hv_lo_block, at_lower = ml_heaviside(var_factory, sat_min - y, name=f"{name}_aw_lo")
hv_f_block, f_pos = ml_heaviside(var_factory, f, name=f"{name}_aw_f")
f_neg = Const(1.0) - f_pos
freeze = at_upper * f_pos + at_lower * f_neg
eq_main = (diff_vars_y[1] - (Const(1.0) - freeze) * f).simplify()
algebraic_vars = [y_sat, y] + aux_vars + list(sat_block.algebraic_vars) + list(hv_up_block.algebraic_vars) + list(hv_lo_block.algebraic_vars) + list(hv_f_block.algebraic_vars)
algebraic_eqs = [eq_main, y_sat - y] + aux_eqs + list(sat_block.algebraic_eqs) + list(hv_up_block.algebraic_eqs) + list(hv_lo_block.algebraic_eqs) + list(hv_f_block.algebraic_eqs)
init_eqs = {**dict(sat_block.init_eqs), **dict(hv_up_block.init_eqs), **dict(hv_lo_block.init_eqs), **dict(hv_f_block.init_eqs)}
block = Block(
name=name,
algebraic_vars=algebraic_vars,
algebraic_eqs=algebraic_eqs,
diff_vars=diff_vars,
init_eqs=init_eqs,
)
block.diff_init_eqs = diff_init_eqs # ADDED
return block, y
[docs]
def integrator_with_non_windup(
var_factory: VarFactory,
x: Var | Expr,
T: Expr | float,
y: Var = None,
name: Optional[str] = '',
sat_min: Expr = Const(float(-1e6)),
sat_max: Expr = Const(float(1e6)),
multilinear: bool = False):
"""
Integrator with non-windup limiter (Figure E.2 style):
dy/dt = (1/T) * x
with clamping logic:
- if y >= sat_max and dy/dt would be positive -> dy/dt = 0
- if y <= sat_min and dy/dt would be negative -> dy/dt = 0
- otherwise dy/dt = (1/T) * x
"""
if y is None:
y = var_factory.add_var('y_' + name)
aux_eqs = []
aux_vars = []
init_eqs = {}
if not isinstance(x, Var):
u = var_factory.add_var('u_' + name)
aux_eqs.append((u - x).simplify())
aux_vars.append(u)
init_eqs[u] = x
x = u
if y.diff_var is None:
dy = var_factory.add_diff_var(name=f'dt_1_{y.name}', base_var=y)
else:
dy = y.diff_var
f = x / T
if not multilinear:
at_or_above_min = heaviside(y - sat_min)
at_or_below_max = heaviside(sat_max - y)
f_pos = heaviside(f)
f_neg = Const(1.0) - f_pos
allow_integrate = heaviside(
at_or_above_min * at_or_below_max
+ (Const(1.0) - at_or_below_max) * f_neg
+ (Const(1.0) - at_or_above_min) * f_pos
)
eq_main = (dy - allow_integrate * f).simplify()
block = Block(
name=name,
algebraic_vars=[y] + aux_vars,
algebraic_eqs=[eq_main] + aux_eqs,
diff_vars=[dy],
init_eqs=init_eqs,
)
else:
sat_block, y_sat_expr = ml_hard_sat(
vf=var_factory,
u=y,
u_min=sat_min,
u_max=sat_max,
name=f"{name}_aw_sat",
)
y_sat = var_factory.add_var(f"y_sat_{name}")
hv_up_block, at_upper = ml_heaviside(var_factory, y - sat_max, name=f"{name}_aw_up")
hv_lo_block, at_lower = ml_heaviside(var_factory, sat_min - y, name=f"{name}_aw_lo")
hv_f_block, f_pos = ml_heaviside(var_factory, f, name=f"{name}_aw_f")
f_neg = Const(1.0) - f_pos
freeze = at_upper * f_pos + at_lower * f_neg
eq_main = (dy - (Const(1.0) - freeze) * f).simplify()
block = Block(
name=name,
algebraic_vars=[y_sat, y] + aux_vars + list(sat_block.algebraic_vars) + list(hv_up_block.algebraic_vars) + list(hv_lo_block.algebraic_vars) + list(hv_f_block.algebraic_vars),
algebraic_eqs=[eq_main, y_sat - y_sat_expr, y_sat - y] + aux_eqs + list(sat_block.algebraic_eqs) + list(hv_up_block.algebraic_eqs) + list(hv_lo_block.algebraic_eqs) + list(hv_f_block.algebraic_eqs),
diff_vars=[dy],
init_eqs={
**init_eqs,
**dict(sat_block.init_eqs),
**dict(hv_up_block.init_eqs),
**dict(hv_lo_block.init_eqs),
**dict(hv_f_block.init_eqs),
y_sat: hard_sat(y, sat_min, sat_max),
},
)
return block, y
[docs]
def integrator_with_windup(
var_factory: VarFactory,
x: Var | Expr,
T: Expr | float,
y: Var = None,
name: Optional[str] = '',
sat_min: Expr = Const(float(-1e6)),
sat_max: Expr = Const(float(1e6)),
multilinear: bool = False):
"""
Simple 1/(sT) integrator followed by output saturation (windup behavior).
Internal state:
dz/dt = x / T
Output:
y = sat(z)
"""
if y is None:
y = var_factory.add_var('y_' + name)
z = var_factory.add_var('z_' + name)
dz = var_factory.add_diff_var(name=f'dt_1_{z.name}', base_var=z)
aux_eqs = []
aux_vars = []
init_eqs = {z: Const(0.0)}
if not isinstance(x, Var):
u = var_factory.add_var('u_' + name)
aux_eqs.append((u - x).simplify())
aux_vars.append(u)
init_eqs[u] = x
x = u
eq_int = (dz - (x / T)).simplify()
if not multilinear:
eq_sat = (y - hard_sat(z, sat_min, sat_max)).simplify()
block = Block(
name=name,
algebraic_vars=[y, z] + aux_vars,
algebraic_eqs=[eq_int, eq_sat] + aux_eqs,
diff_vars=[dz],
init_eqs={**init_eqs, y: hard_sat(z, sat_min, sat_max)},
)
else:
sat_block, y_sat_expr = ml_hard_sat(
vf=var_factory,
u=z,
u_min=sat_min,
u_max=sat_max,
name=f"{name}_windup_sat",
)
block = Block(
name=name,
algebraic_vars=[y, z] + aux_vars + list(sat_block.algebraic_vars),
algebraic_eqs=[eq_int, y - y_sat_expr] + aux_eqs + list(sat_block.algebraic_eqs),
diff_vars=[dz],
init_eqs={**init_eqs, **dict(sat_block.init_eqs), y: hard_sat(z, sat_min, sat_max)},
)
return block, y
[docs]
def tf_to_diffblock_with_antiwindup_by_feedback(
var_factory: VarFactory,
num: List[Var | float],
den: List[Var | float],
x: Var,
y: Var = None,
name: Optional[str] = '',
sat_min: Expr = Const(-1e6),
sat_max: Expr = Const(1e6),
Kaw: Expr = Const(10.0), # anti-windup gain (1/Tt)
):
"""
Implements a back-calculation anti-windup controller using (u_sat - u) as feedback input.
Works better for implicit solvers
num: numerator coefficients [b0, ..., bm]
den: denominator coefficients [a0, ..., an]
x: input signal
y: output (saturated)
"""
if len(num) > len(den):
raise ValueError("Improper TF")
if y is None:
y = var_factory.add_var(f'y_{name}')
aux_eqs = []
aux_vars = []
# Wrap input expression
if not isinstance(x, Var):
u_in = var_factory.add_var(f'u_in_{name}')
aux_eqs.append((u_in - x).simplify())
aux_vars.append(u_in)
x = u_in
# --------------------------------------------------
# Build derivative chains
# --------------------------------------------------
diff_vars = []
diff_x = [x]
diff_y = [y]
base = x
for i in range(1, len(num)):
dv = var_factory.add_diff_var(f'dt_{i}_{x.name}', base_var=base)
diff_vars.append(dv)
diff_x.append(dv)
base = dv
base = y
for i in range(1, len(den)):
dv = var_factory.add_diff_var(f'dt_{i}_{y.name}', base_var=base)
diff_vars.append(dv)
diff_y.append(dv)
base = dv
# --------------------------------------------------
# Unsaturated output (internal)
# --------------------------------------------------
u = var_factory.add_var(f'u_{name}')
aux_vars.append(u)
tf_rhs = np.dot(num, diff_x)
tf_lhs = np.dot(den, diff_y)
aux_eqs.append((u - tf_rhs).simplify())
# --------------------------------------------------
# Saturation
# --------------------------------------------------
u_sat = var_factory.add_var(f'u_sat_{name}')
aux_vars.append(u_sat)
sat_expr = (
sat_min
+ (u - sat_min) * heaviside(u - sat_min)
- (u - sat_max) * heaviside(u - sat_max)
)
aux_eqs.append((u_sat - sat_expr).simplify())
# --------------------------------------------------
# Main differential equation
# Add anti-windup ONLY to integrator (dy/dt term)
# --------------------------------------------------
lhs = Const(0)
rhs = Const(0)
for i, term in enumerate(den * np.array(diff_y)):
lhs += term
for term in num * np.array(diff_x):
rhs += term
# Anti-windup injection
rhs += Kaw * (u_sat - u)
eq_main = (lhs - rhs).simplify()
block = Block(
name=name,
algebraic_vars=[y, u, u_sat] + aux_vars,
algebraic_eqs=[eq_main] + aux_eqs,
diff_vars=diff_vars,
)
return block, y
[docs]
def discrete_control_block(
var_factory: VarFactory,
m: Var,
delta_m: Var,
m_max: Var,
m_min: Var,
v: Var,
v_ref: Var,
delta_v: Var,
ts: Var, # seconds
name: Optional[str] = ''
) -> Tuple[Block, Var]:
m_last = var_factory.add_var(f'm_last_{name}')
t_signal = var_factory.add_var(f't_signal_{name}')
tick = (t_signal >= ts).to_expression()
# your switching increment (only evaluated on tick)
inc = delta_m * (v - v_ref <= delta_v).to_expression() \
- delta_m * (v - v_ref >= delta_v).to_expression()
changed = (1 - Comparison(m_last, CmpOp.EQ, m).to_expression())
dt = var_factory.add_var(f'dt')
block = Block(
name=name + '_input_controlled_switch',
discrete_eqs={
t_signal: changed*(t_signal + dt),
m:hard_sat(m + inc * tick, m_min, m_max),
m_last: m,
},
api_obj_mapping={dt: ParamPowerFlowReferenceType.dt}
)
return block, m
[docs]
def deadband_block(
var_factory: VarFactory,
x: Var | Expr,
y: Var = None,
name: Optional[str] = '',
deadband: Expr = Const(0.1)) -> Tuple[Block, Var]:
"""
Creates a deadband function block.
The deadband function returns:
- 0 when input is within the deadband [-deadband, +deadband]
- input - deadband when input > deadband
- input + deadband when input < -deadband
Parameters:
-----------
var_factory: VarFactory instance for creating variables
x: input signal (Var or Expr)
y: output variable (optional, created if None)
name: name prefix for variables
deadband: deadband threshold (default 0.1)
Returns:
--------
Tuple of (Block, output_var)
"""
if y is None:
y = var_factory.add_var('y_deadband_' + name)
aux_eqs = []
aux_vars = []
# Wrap input expression if needed
if not isinstance(x, Var):
u = var_factory.add_var('u_deadband_' + name)
aux_eqs.append((u - x).simplify())
aux_vars.append(u)
x = u
# Deadband logic using Heaviside step functions
# y = (u - deadband) * H(u - deadband) + (u + deadband) * H(-deadband - u)
# = (u - deadband) * H(u - deadband) + (u + deadband) * H(-u - deadband)
h_pos = heaviside(x - deadband)
h_neg = heaviside(-x - deadband)
# Output expression
output_expr = (x - deadband) * h_pos + (x + deadband) * h_neg
# Algebraic equation: y - output_expr = 0
eq = (y - output_expr).simplify()
block = Block(
name=name + '_deadband',
algebraic_vars=[y] + aux_vars,
algebraic_eqs=[eq] + aux_eqs,
diff_vars=[]
)
return block, y