Source code for VeraGridEngine.Utils.Symbolic.block_helpers

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0

from __future__ import annotations

from typing import List, Optional, Tuple

import numpy as np
from VeraGridEngine.Devices.Dynamic.var_factory import VarFactory
from VeraGridEngine.Utils.Symbolic.block import Block
from VeraGridEngine.enumerations import ParamPowerFlowReferenceType
from VeraGridEngine.basic_structures import Vec
from VeraGridEngine.Utils.Symbolic.symbolic import (Var, Const, Expr, BinOp, CmpOp, Comparison, heaviside, hard_sat, expression2numba, get_expression_vars)
from VeraGridEngine.Utils.Symbolic.symbolic_ml import mti_hard_sat, ml_hard_sat, ml_heaviside


[docs] def tf_to_block(var_factory: VarFactory, num: List[Var | float], den: List[Var | float], x: Var | Expr, y: Var = None, create_state: bool = False, name: Optional[str] = '') -> Tuple[Block, Var]: """ "transform definition" to block num: list of numerator coefficients [b0, b1, ..., bm] den: list of denominator coefficients [a0, a1, ..., an] x: sympy symbol for input y: sympy function of t for output """ if len(num) > len(den): raise ValueError("Transfer function is improper: numerator degree > denominator degree.") if y is None: y = var_factory.add_var('y_' + name) aux_eqs: List[Expr] = list() aux_vars: List[Expr] = list() init_eqs = dict() diff_init_eqs = dict() # ADDED x_save = x # Initialize x_save to avoid unbound variable error # check if is an expression if not isinstance(x, Var): u = var_factory.add_var('u_' + name) aux_eqs.append((u - x).simplify()) aux_vars.append(u) init_eqs[u] = x x = u diff_vars_x = [x] diff_vars_y = [y] diff_vars = list() state_eqs = list() state_vars = list() base_var = x for i in range(1, len(num)): if i == 1 and create_state: x_new = var_factory.add_var(f"x_{i}_" + x.name) state_vars.append(x_new) state_eqs.append(x_save) # x is already a Var diff_vars_x.append(x_new) base_var = x_new init_eqs[x_new] = Const(0.0) else: if base_var.diff_var is None: new_diff = var_factory.add_diff_var(name=f'dt_{i}_' + x.name, base_var=base_var) diff_vars.append(new_diff) diff_init_eqs[new_diff] = Const(0.0) # ADDED diff_vars_x.append(base_var.diff_var) base_var = base_var.diff_var base_var = y for i in range(1, len(den)): if base_var.diff_var is None: new_diff = var_factory.add_diff_var(name=f'dt_{i}_' + y.name, base_var=base_var) diff_vars.append(new_diff) diff_init_eqs[new_diff] = Const(0.0) # ADDED diff_vars_y.append(base_var.diff_var) base_var = base_var.diff_var # Create the diff equation rhs = np.array(diff_vars_x) @ np.array(num) lhs = np.array(diff_vars_y) @ np.array(den) # Pure integrator-style blocks (den[0] == 0) need a derivative init that # matches the input, otherwise the first consistency check leaves a residual. if len(den) > 1 and den[0] == 0 and y.base_var is not None: diff_init_eqs[y.diff_var] = x / den[1] block = Block() block.algebraic_vars = [y] + aux_vars block.algebraic_eqs = [lhs - rhs] + aux_eqs block.state_eqs = state_eqs block.state_vars = state_vars block.diff_vars = diff_vars block.in_vars = [x] # Input is x block.out_vars = [y] # Output is y block.init_eqs = init_eqs block.diff_init_eqs = diff_init_eqs # ADDED return block, y
[docs] def tf_to_diffblock_with_output( var_factory: VarFactory, num: List[Var | float], den: List[Var | float], x: Var | Expr, y: Var = None, create_state: bool = False, name: Optional[str] = ''): """ num: list of numerator coefficients [b0, b1, ..., bm] den: list of denominator coefficients [a0, a1, ..., an] x: sympy symbol for input y: sympy function of t for output """ if len(num) > len(den): raise ValueError("Transfer function is improper: numerator degree > denominator degree.") if (isinstance(num[-1], float) and num[-1] == 0) or (isinstance(den[-1], float) and den[-1] == 0): raise ValueError("Leading coefficient of numerator or denominator cannot be zero.") if y is None: y = var_factory.add_var('y_' + name) aux_eqs: List[Var | Expr] = [] aux_vars: List[Var | Expr] = [] x_save = None # Initialize x_save to avoid unbound variable error # check if is an expression if not isinstance(x, Var): u = var_factory.add_var('u_' + name) aux_eqs.append((u - x).simplify()) aux_vars.append(u) x_save = x x = u diff_vars_x = [x] diff_vars_y = [y] diff_vars = list() state_eqs = list() state_vars = list() init_eqs = dict() diff_init_eqs = dict() # ADDED base_var = x for i in range(1, len(num)): if i == 1 and create_state: x_new = var_factory.add_var(f"x_{i}_" + x.name) state_vars.append(x_new) # x_save only exists if x was an expression (not a Var) if 'x_save' in locals(): state_eqs.append(x_save) else: state_eqs.append(x) # x is already a Var diff_vars_x.append(x_new) base_var = x_new init_eqs[x_new] = Const(0.0) else: if base_var.diff_var is None: new_diff = var_factory.add_diff_var(name=f'dt_{i}_' + x.name, base_var=base_var) diff_vars.append(new_diff) diff_init_eqs[new_diff] = Const(0.0) # ADDED diff_vars_x.append(base_var.diff_var) base_var = base_var.diff_var base_var = y for i in range(1, len(den)): if base_var.diff_var is None: new_diff = var_factory.add_diff_var(name=f'dt_{i}_' + y.name, base_var=base_var) diff_vars.append(new_diff) diff_init_eqs[new_diff] = Const(0.0) # ADDED diff_vars_y.append(base_var.diff_var) base_var = base_var.diff_var # Create the diff equation rhs: Vec = np.array(diff_vars_x) @ np.array(num) lhs: Vec = np.array(diff_vars_y) @ np.array(den) block = Block() block.state_eqs = state_eqs block.state_vars = state_vars block.algebraic_vars = [y] + aux_vars block.algebraic_eqs = [lhs - rhs] + aux_eqs block.diff_vars = diff_vars block.init_eqs = init_eqs block.diff_init_eqs = diff_init_eqs # ADDED return block, y, x
[docs] def tf_to_block_with_states( var_factory: VarFactory, num: List[Var | float], den: List[Var | float], x: Var | Expr, y: Var = None, name: Optional[str] = ''): """ num: numerator coefficients [b0,...,bm] den: denominator coefficients [a0,...,an], with an != 0 u: input Var y: output Var """ if len(num) > len(den): raise ValueError("Transfer function is improper: numerator degree > denominator degree.") if num[-1] == 0 or den[-1] == 0: raise ValueError("Leading coefficient of cannot be zero.") if y is None: y = var_factory.add_var('y_' + name) aux_eqs = [] aux_vars = [] diff_init_eqs = dict() # ADDED # check if x is a expression if not isinstance(x, Var): u = var_factory.add_var('u_' + name) aux_eqs.append(u - x) aux_vars.append(u) x = u # Normalize den = [c / den[-1] for c in den] num = [c / den[-1] for c in num] # States x_states = [x] y_states = [y] y_states.extend([var_factory.add_var(f"y{i}") for i in range(1, len(den))]) # y1,...,yn x_states.extend([var_factory.add_var(f"x{i}") for i in range(1, len(num))]) # x1,...,xm # Differential equations (canonical form) diff_eqs = list() diff_vars = list() for i in range(len(y_states) - 1): if y_states[i].diff_var is None: dy = var_factory.add_diff_var(f"d_{y_states[i].name}", base_var=y_states[i]) diff_vars.append(dy) diff_init_eqs[dy] = Const(0.0) # ADDED diff_eqs.append(y_states[i + 1] - y_states[i].diff_var) for i in range(len(num) - 1): if x_states[i].diff_var is None: dx = var_factory.add_diff_var(f"d^{i}_{x_states[i].name}", base_var=x_states[i]) diff_vars.append(dx) diff_init_eqs[dx] = Const(0.0) # ADDED diff_eqs.append(x_states[i + 1] - x_states[i].diff_var) # Last equation: linear combination last_eq = ( sum(den[i] * y_states[i] for i in range(len(y_states))) - sum(num[i] * x_states[i] for i in range(len(x_states))) ) diff_eqs.append(last_eq) block = Block( algebraic_vars=y_states + x_states[1:] + aux_vars, algebraic_eqs=diff_eqs + aux_eqs, ) block.diff_vars = diff_vars block.diff_init_eqs = diff_init_eqs # ADDED block.name = 'TF' return block, y_states[0]
[docs] def tf_to_block2( var_factory: VarFactory, num: List[Var | float], den: List[Var | float], x: Var | float, y: Var = None, name:str = ''): """ num: numerator coefficients [b0,...,bm] den: denominator coefficients [a0,...,an], with an != 0 u: input Expr or Var y: output Var """ if len(num) >= len(den): raise ValueError("Transfer function is improper: numerator degree > denominator degree.") if num[-1] == 0 or den[-1] == 0: raise ValueError("Leading coefficient of cannot be zero.") aux_eqs: List[Expr] = list() aux_vars: List[Var] = list() # check if xis a expression if not isinstance(x, Var): u = var_factory.add_var('u') aux_eqs.append(u - x) aux_vars.append(u) x = u # Normalize den = [c / den[-1] for c in den] num = [c / den[-1] for c in num] order = len(den) # system order x_states: List[Var] = [x] # x0 x_states.extend([var_factory.add_var(f"x{i}") for i in range(1, order + 1)]) # x1,...,xn # Differential equations (canonical form) state_eqs = list() state_vars = list() for i in range(1, order): state_eqs.append(x_states[i]) state_vars.append(x_states[i + 1]) y_states: List[Var] = [var_factory.add_var(f"y{i}") for i in range(len(num) + 1)] # x1,...,xn for i in range(len(num)): state_eqs.append(y_states[i]) state_vars.append(y_states[i + 1]) # Last equation: linear combination last_eq = ( sum(den[i] * y_states[i] for i in range(order)) - sum(num[i] * x_states[i] for i in range(len(num))) ) aux_eqs.append(last_eq) block = Block( state_eqs=state_eqs, state_vars=state_vars, algebraic_eqs=aux_eqs, # TODO: this is highly suspicious (better add vars to aux_vars) algebraic_vars=x_states[0] + y_states[0] + aux_vars, ) return block, y_states[0]
[docs] def to_implicit(block: Block, vfactory: VarFactory) -> Block: """ Convert a block with explicit state equations to implicit form. For each state variable x with equation dx/dt = f(x, y): - Create a new algebraic variable dt_x representing the derivative - Add dt_x to algebraic_vars - Add equation: diff(x) - f(x, y) = 0 to algebraic_eqs - Move x from state_vars to algebraic_vars - Remove the state equation from state_eqs This transforms the explicit ODE form into DAE implicit form suitable for solvers that expect algebraic equations. """ # Process each state variable and its corresponding state equation # We iterate backwards to safely modify lists while iterating for i, state_var in enumerate(block.state_vars): state_eq = block.state_eqs[i] # Create a new algebraic variable for the derivative if state_var.diff_var == None: dt_var = vfactory.add_diff_var(f"dt_{state_var.name}", base_var=state_var) block.diff_vars.append(dt_var) else: dt_var = state_var.diff_var implicit_eq = dt_var - state_eq block.algebraic_vars.append(state_var) block.algebraic_eqs.append(implicit_eq) #block.init_eqs[dt_var] = block.init_eqs[state_var] # The state_vars become algebraic_vars since they now appear in algebraic equations block.state_vars = list() block.state_eqs = list() # Recursively process children for b in block.children: to_implicit(b, vfactory) return block
def _extract_rhs_from_implicit_eq(eq: Expr, diff_var: Var) -> Expr | None: """ Try to recover rhs from an implicit algebraic equation of the form: diff_var - rhs = 0 Returns rhs when recognized, otherwise None. """ if isinstance(eq, BinOp): if eq.op == "-": if isinstance(eq.left, Var) and eq.left.uid == diff_var.uid: return eq.right if isinstance(eq.right, Var) and eq.right.uid == diff_var.uid: return eq.left elif eq.op == "+": if isinstance(eq.left, Var) and eq.left.uid == diff_var.uid: return (-eq.right).simplify() if isinstance(eq.right, Var) and eq.right.uid == diff_var.uid: return (-eq.left).simplify() return None
[docs] def to_explicit(block: Block, vfactory: VarFactory) -> Block: """ Convert an implicit DAE-like block back to explicit state equations. For each diff var `dt_x` in `block.diff_vars`, look for an algebraic equation equivalent to `dt_x - f(...) = 0`. When found: - create/ensure state equation `dx/dt = f(...)` for base var `x` - remove that implicit algebraic equation - substitute `dt_x` by `f(...)` in remaining algebraic equations - remove all recovered diff vars from `block.diff_vars` Recurses through child blocks. """ block.unify_blocks() for i, diff_var in enumerate(block.diff_vars): new_diff_var = vfactory.add_var(f"dt_{diff_var.base_var.name}") if diff_var.base_var not in block.state_vars: block.state_vars.append(diff_var.base_var) block.state_eqs.append(diff_var) block.algebraic_vars.remove(diff_var.base_var) else: state_eq = block.state_eqs[block.state_vars.index(diff_var.base_var)] block.algebraic_eqs.append(new_diff_var - state_eq) block.algebraic_vars.append(new_diff_var) block.update_equations(diff_var, new_diff_var) block.diff_vars = list() for child in block.children: to_explicit(child, vfactory) return block
[docs] def tf_to_diffblock_with_antiwindup( var_factory: VarFactory, num: List[Var | float], den: List[Var | float], x: Var, y: Var = None, name: Optional[str] = '', sat_min: Expr = Const(float(-1e6)), sat_max: Expr = Const(float(1e6)), multilinear=False, PI: bool = False): """ num: numerator coefficients [b0, ..., bm] den: denominator coefficients [a0, ..., an] x: input (Var or Expr) y: output Var sat_min, sat_max: saturation limits for anti-windup """ if len(num) > len(den): raise ValueError("Transfer function is improper: numerator degree > denominator degree.") if num[-1] == 0 or den[-1] == 0: raise ValueError("Leading coefficient cannot be zero.") if y is None: y = var_factory.add_var('y_' + name) aux_eqs = [] aux_vars = [] diff_init_eqs = dict() # ADDED # If x is an expression, wrap it in a Var if not isinstance(x, Var): u = var_factory.add_var('u_' + name) aux_eqs.append((u - x).simplify()) aux_vars.append(u) x = u diff_vars_x = [x] diff_vars_y = [y] diff_vars = [] # Derivative chain for x base_var = x for i in range(1, len(num)): if base_var.diff_var is None: new_diff = var_factory.add_diff_var(name=f'dt_{i}_' + x.name, base_var=base_var) diff_vars.append(new_diff) diff_init_eqs[new_diff] = Const(0.0) # ADDED diff_vars_x.append(x.diff_var) base_var = x.diff_var # Derivative chain for y base_var = y for i in range(1, len(den)): if base_var.diff_var is None: new_diff = var_factory.add_diff_var(name=f'dt_{i}_' + y.name, base_var=base_var) diff_vars.append(new_diff) diff_init_eqs[new_diff] = Const(0.0) # ADDED diff_vars_y.append(y.diff_var) base_var = y.diff_var is_simple_integrator = ( len(num) == 1 and len(den) == 2 and den[0] == 0 ) if is_simple_integrator: # 1/(sT): dy/dt = (num[0]/den[1]) * u f = (num[0] * diff_vars_x[0]) / den[1] else: # Generic TF fallback rhs = Const(0.0) lhs_wo_dy = Const(0.0) if not PI: for i, term in enumerate(np.array(den) * np.array(diff_vars_y)): if i != 1: lhs_wo_dy += term for term in np.array(num) * np.array(diff_vars_x): rhs += term f = (rhs - lhs_wo_dy) / Const(den[1]) # Integrator with non-windup limiter (Figure E.2 behavior): # - freeze when already at upper bound and f > 0 # - freeze when already at lower bound and f < 0 # - otherwise integrate with dy/dt = f if not multilinear: at_or_above_min = heaviside(y - sat_min) at_or_below_max = heaviside(sat_max - y) f_pos = heaviside(f) f_neg = Const(1.0) - f_pos allow_integrate = heaviside( at_or_above_min * at_or_below_max + (Const(1.0) - at_or_below_max) * f_neg + (Const(1.0) - at_or_above_min) * f_pos ) eq_main = (diff_vars_y[1] - allow_integrate * f).simplify() algebraic_vars = [y] + aux_vars algebraic_eqs = [eq_main] + aux_eqs init_eqs = {} else: sat_block, y_sat = mti_hard_sat( vf=var_factory, u=y, ul=sat_min, uu=sat_max, yl=sat_min, yu=sat_max, name=f"{name}_aw_sat", ) hv_up_block, at_upper = ml_heaviside(var_factory, y - sat_max, name=f"{name}_aw_up") hv_lo_block, at_lower = ml_heaviside(var_factory, sat_min - y, name=f"{name}_aw_lo") hv_f_block, f_pos = ml_heaviside(var_factory, f, name=f"{name}_aw_f") f_neg = Const(1.0) - f_pos freeze = at_upper * f_pos + at_lower * f_neg eq_main = (diff_vars_y[1] - (Const(1.0) - freeze) * f).simplify() algebraic_vars = [y_sat, y] + aux_vars + list(sat_block.algebraic_vars) + list(hv_up_block.algebraic_vars) + list(hv_lo_block.algebraic_vars) + list(hv_f_block.algebraic_vars) algebraic_eqs = [eq_main, y_sat - y] + aux_eqs + list(sat_block.algebraic_eqs) + list(hv_up_block.algebraic_eqs) + list(hv_lo_block.algebraic_eqs) + list(hv_f_block.algebraic_eqs) init_eqs = {**dict(sat_block.init_eqs), **dict(hv_up_block.init_eqs), **dict(hv_lo_block.init_eqs), **dict(hv_f_block.init_eqs)} block = Block( name=name, algebraic_vars=algebraic_vars, algebraic_eqs=algebraic_eqs, diff_vars=diff_vars, init_eqs=init_eqs, ) block.diff_init_eqs = diff_init_eqs # ADDED return block, y
[docs] def integrator_with_non_windup( var_factory: VarFactory, x: Var | Expr, T: Expr | float, y: Var = None, name: Optional[str] = '', sat_min: Expr = Const(float(-1e6)), sat_max: Expr = Const(float(1e6)), multilinear: bool = False): """ Integrator with non-windup limiter (Figure E.2 style): dy/dt = (1/T) * x with clamping logic: - if y >= sat_max and dy/dt would be positive -> dy/dt = 0 - if y <= sat_min and dy/dt would be negative -> dy/dt = 0 - otherwise dy/dt = (1/T) * x """ if y is None: y = var_factory.add_var('y_' + name) aux_eqs = [] aux_vars = [] init_eqs = {} if not isinstance(x, Var): u = var_factory.add_var('u_' + name) aux_eqs.append((u - x).simplify()) aux_vars.append(u) init_eqs[u] = x x = u if y.diff_var is None: dy = var_factory.add_diff_var(name=f'dt_1_{y.name}', base_var=y) else: dy = y.diff_var f = x / T if not multilinear: at_or_above_min = heaviside(y - sat_min) at_or_below_max = heaviside(sat_max - y) f_pos = heaviside(f) f_neg = Const(1.0) - f_pos allow_integrate = heaviside( at_or_above_min * at_or_below_max + (Const(1.0) - at_or_below_max) * f_neg + (Const(1.0) - at_or_above_min) * f_pos ) eq_main = (dy - allow_integrate * f).simplify() block = Block( name=name, algebraic_vars=[y] + aux_vars, algebraic_eqs=[eq_main] + aux_eqs, diff_vars=[dy], init_eqs=init_eqs, ) else: sat_block, y_sat_expr = ml_hard_sat( vf=var_factory, u=y, u_min=sat_min, u_max=sat_max, name=f"{name}_aw_sat", ) y_sat = var_factory.add_var(f"y_sat_{name}") hv_up_block, at_upper = ml_heaviside(var_factory, y - sat_max, name=f"{name}_aw_up") hv_lo_block, at_lower = ml_heaviside(var_factory, sat_min - y, name=f"{name}_aw_lo") hv_f_block, f_pos = ml_heaviside(var_factory, f, name=f"{name}_aw_f") f_neg = Const(1.0) - f_pos freeze = at_upper * f_pos + at_lower * f_neg eq_main = (dy - (Const(1.0) - freeze) * f).simplify() block = Block( name=name, algebraic_vars=[y_sat, y] + aux_vars + list(sat_block.algebraic_vars) + list(hv_up_block.algebraic_vars) + list(hv_lo_block.algebraic_vars) + list(hv_f_block.algebraic_vars), algebraic_eqs=[eq_main, y_sat - y_sat_expr, y_sat - y] + aux_eqs + list(sat_block.algebraic_eqs) + list(hv_up_block.algebraic_eqs) + list(hv_lo_block.algebraic_eqs) + list(hv_f_block.algebraic_eqs), diff_vars=[dy], init_eqs={ **init_eqs, **dict(sat_block.init_eqs), **dict(hv_up_block.init_eqs), **dict(hv_lo_block.init_eqs), **dict(hv_f_block.init_eqs), y_sat: hard_sat(y, sat_min, sat_max), }, ) return block, y
[docs] def integrator_with_windup( var_factory: VarFactory, x: Var | Expr, T: Expr | float, y: Var = None, name: Optional[str] = '', sat_min: Expr = Const(float(-1e6)), sat_max: Expr = Const(float(1e6)), multilinear: bool = False): """ Simple 1/(sT) integrator followed by output saturation (windup behavior). Internal state: dz/dt = x / T Output: y = sat(z) """ if y is None: y = var_factory.add_var('y_' + name) z = var_factory.add_var('z_' + name) dz = var_factory.add_diff_var(name=f'dt_1_{z.name}', base_var=z) aux_eqs = [] aux_vars = [] init_eqs = {z: Const(0.0)} if not isinstance(x, Var): u = var_factory.add_var('u_' + name) aux_eqs.append((u - x).simplify()) aux_vars.append(u) init_eqs[u] = x x = u eq_int = (dz - (x / T)).simplify() if not multilinear: eq_sat = (y - hard_sat(z, sat_min, sat_max)).simplify() block = Block( name=name, algebraic_vars=[y, z] + aux_vars, algebraic_eqs=[eq_int, eq_sat] + aux_eqs, diff_vars=[dz], init_eqs={**init_eqs, y: hard_sat(z, sat_min, sat_max)}, ) else: sat_block, y_sat_expr = ml_hard_sat( vf=var_factory, u=z, u_min=sat_min, u_max=sat_max, name=f"{name}_windup_sat", ) block = Block( name=name, algebraic_vars=[y, z] + aux_vars + list(sat_block.algebraic_vars), algebraic_eqs=[eq_int, y - y_sat_expr] + aux_eqs + list(sat_block.algebraic_eqs), diff_vars=[dz], init_eqs={**init_eqs, **dict(sat_block.init_eqs), y: hard_sat(z, sat_min, sat_max)}, ) return block, y
[docs] def tf_to_diffblock_with_antiwindup_by_feedback( var_factory: VarFactory, num: List[Var | float], den: List[Var | float], x: Var, y: Var = None, name: Optional[str] = '', sat_min: Expr = Const(-1e6), sat_max: Expr = Const(1e6), Kaw: Expr = Const(10.0), # anti-windup gain (1/Tt) ): """ Implements a back-calculation anti-windup controller using (u_sat - u) as feedback input. Works better for implicit solvers num: numerator coefficients [b0, ..., bm] den: denominator coefficients [a0, ..., an] x: input signal y: output (saturated) """ if len(num) > len(den): raise ValueError("Improper TF") if y is None: y = var_factory.add_var(f'y_{name}') aux_eqs = [] aux_vars = [] # Wrap input expression if not isinstance(x, Var): u_in = var_factory.add_var(f'u_in_{name}') aux_eqs.append((u_in - x).simplify()) aux_vars.append(u_in) x = u_in # -------------------------------------------------- # Build derivative chains # -------------------------------------------------- diff_vars = [] diff_x = [x] diff_y = [y] base = x for i in range(1, len(num)): dv = var_factory.add_diff_var(f'dt_{i}_{x.name}', base_var=base) diff_vars.append(dv) diff_x.append(dv) base = dv base = y for i in range(1, len(den)): dv = var_factory.add_diff_var(f'dt_{i}_{y.name}', base_var=base) diff_vars.append(dv) diff_y.append(dv) base = dv # -------------------------------------------------- # Unsaturated output (internal) # -------------------------------------------------- u = var_factory.add_var(f'u_{name}') aux_vars.append(u) tf_rhs = np.dot(num, diff_x) tf_lhs = np.dot(den, diff_y) aux_eqs.append((u - tf_rhs).simplify()) # -------------------------------------------------- # Saturation # -------------------------------------------------- u_sat = var_factory.add_var(f'u_sat_{name}') aux_vars.append(u_sat) sat_expr = ( sat_min + (u - sat_min) * heaviside(u - sat_min) - (u - sat_max) * heaviside(u - sat_max) ) aux_eqs.append((u_sat - sat_expr).simplify()) # -------------------------------------------------- # Main differential equation # Add anti-windup ONLY to integrator (dy/dt term) # -------------------------------------------------- lhs = Const(0) rhs = Const(0) for i, term in enumerate(den * np.array(diff_y)): lhs += term for term in num * np.array(diff_x): rhs += term # Anti-windup injection rhs += Kaw * (u_sat - u) eq_main = (lhs - rhs).simplify() block = Block( name=name, algebraic_vars=[y, u, u_sat] + aux_vars, algebraic_eqs=[eq_main] + aux_eqs, diff_vars=diff_vars, ) return block, y
[docs] def discrete_control_block( var_factory: VarFactory, m: Var, delta_m: Var, m_max: Var, m_min: Var, v: Var, v_ref: Var, delta_v: Var, ts: Var, # seconds name: Optional[str] = '' ) -> Tuple[Block, Var]: m_last = var_factory.add_var(f'm_last_{name}') t_signal = var_factory.add_var(f't_signal_{name}') tick = (t_signal >= ts).to_expression() # your switching increment (only evaluated on tick) inc = delta_m * (v - v_ref <= delta_v).to_expression() \ - delta_m * (v - v_ref >= delta_v).to_expression() changed = (1 - Comparison(m_last, CmpOp.EQ, m).to_expression()) dt = var_factory.add_var(f'dt') block = Block( name=name + '_input_controlled_switch', discrete_eqs={ t_signal: changed*(t_signal + dt), m:hard_sat(m + inc * tick, m_min, m_max), m_last: m, }, api_obj_mapping={dt: ParamPowerFlowReferenceType.dt} ) return block, m
[docs] def deadband_block( var_factory: VarFactory, x: Var | Expr, y: Var = None, name: Optional[str] = '', deadband: Expr = Const(0.1)) -> Tuple[Block, Var]: """ Creates a deadband function block. The deadband function returns: - 0 when input is within the deadband [-deadband, +deadband] - input - deadband when input > deadband - input + deadband when input < -deadband Parameters: ----------- var_factory: VarFactory instance for creating variables x: input signal (Var or Expr) y: output variable (optional, created if None) name: name prefix for variables deadband: deadband threshold (default 0.1) Returns: -------- Tuple of (Block, output_var) """ if y is None: y = var_factory.add_var('y_deadband_' + name) aux_eqs = [] aux_vars = [] # Wrap input expression if needed if not isinstance(x, Var): u = var_factory.add_var('u_deadband_' + name) aux_eqs.append((u - x).simplify()) aux_vars.append(u) x = u # Deadband logic using Heaviside step functions # y = (u - deadband) * H(u - deadband) + (u + deadband) * H(-deadband - u) # = (u - deadband) * H(u - deadband) + (u + deadband) * H(-u - deadband) h_pos = heaviside(x - deadband) h_neg = heaviside(-x - deadband) # Output expression output_expr = (x - deadband) * h_pos + (x + deadband) * h_neg # Algebraic equation: y - output_expr = 0 eq = (y - output_expr).simplify() block = Block( name=name + '_deadband', algebraic_vars=[y] + aux_vars, algebraic_eqs=[eq] + aux_eqs, diff_vars=[] ) return block, y