Source code for VeraGridEngine.Templates.Emt.arbitrary_source_emt_template

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0

from __future__ import annotations

from typing import Dict, List, Sequence

from VeraGridEngine.Devices.Dynamic.emt_template import EmtModelTemplate
from VeraGridEngine.Devices.Dynamic.var_factory import VarFactory
from VeraGridEngine.Templates.Emt.load_RLC_emt_template import _get_phase_count_name
from VeraGridEngine.Utils.Symbolic.block import Expr, Var
from VeraGridEngine.Utils.Symbolic import symbolic as sym
from VeraGridEngine.Utils.Symbolic.symbolic import CmpOp
from VeraGridEngine.Utils.Symbolic.symbolic import Comparison
from VeraGridEngine.enumerations import DeviceType, VarPowerFlowReferenceType


def _get_active_nabc_labels(phN: bool, phA: bool, phB: bool, phC: bool) -> List[str]:
    active_labels: List[str] = list()

    if phN:
        active_labels.append("N")
    if phA:
        active_labels.append("A")
    if phB:
        active_labels.append("B")
    if phC:
        active_labels.append("C")

    if active_labels:
        return active_labels
    else:
        raise ValueError("At least one source terminal must be enabled for an arbitrary EMT source template")


def _get_voltage_reference(phase_label: str) -> VarPowerFlowReferenceType:
    if phase_label == "N":
        return VarPowerFlowReferenceType.v_N
    elif phase_label == "A":
        return VarPowerFlowReferenceType.v_A
    elif phase_label == "B":
        return VarPowerFlowReferenceType.v_B
    elif phase_label == "C":
        return VarPowerFlowReferenceType.v_C
    else:
        raise ValueError(f"Unsupported source phase label '{phase_label}'")


def _get_current_reference(phase_label: str) -> VarPowerFlowReferenceType:
    if phase_label == "N":
        return VarPowerFlowReferenceType.i_N
    elif phase_label == "A":
        return VarPowerFlowReferenceType.i_A
    elif phase_label == "B":
        return VarPowerFlowReferenceType.i_B
    elif phase_label == "C":
        return VarPowerFlowReferenceType.i_C
    else:
        raise ValueError(f"Unsupported source phase label '{phase_label}'")


def _build_external_mapping(voltage_vars: Dict[str, Var], current_vars: Dict[str, Var]) -> Dict[VarPowerFlowReferenceType, Var | None]:
    return dict({
        VarPowerFlowReferenceType.v_N: voltage_vars.get("N", None),
        VarPowerFlowReferenceType.v_A: voltage_vars.get("A", None),
        VarPowerFlowReferenceType.v_B: voltage_vars.get("B", None),
        VarPowerFlowReferenceType.v_C: voltage_vars.get("C", None),
        VarPowerFlowReferenceType.i_N: current_vars.get("N", None),
        VarPowerFlowReferenceType.i_A: current_vars.get("A", None),
        VarPowerFlowReferenceType.i_B: current_vars.get("B", None),
        VarPowerFlowReferenceType.i_C: current_vars.get("C", None),
    })


def _build_template_name(base_name: str,
                         phN: bool,
                         phA: bool,
                         phB: bool,
                         phC: bool,
                         requested_name: str | None) -> str:
    phase_count: int = int(phN) + int(phA) + int(phB) + int(phC)
    return _get_phase_count_name(base_name=base_name, phase_count=phase_count, requested_name=requested_name)


def _build_lookup_comparison(lhs: Expr, op: CmpOp, rhs: Expr) -> Expr:
    return Comparison(lhs=lhs, op=op, rhs=rhs).to_expression()


def _build_segment_expression(coord_var: Var,
                              x_left_var: Var,
                              x_right_var: Var,
                              y_left_var: Var,
                              y_right_var: Var) -> Expr:
    slope_expr: Expr = (y_right_var - y_left_var) / (x_right_var - x_left_var)
    intercept_expr: Expr = y_left_var - slope_expr * x_left_var
    return slope_expr * coord_var + intercept_expr


def _validate_waveform_points(time_points: Sequence[float], value_points: Sequence[float]) -> None:
    point_count: int = len(time_points)
    point_index: int

    if point_count == len(value_points):
        pass
    else:
        raise ValueError("Arbitrary source time/value vectors must have the same size")

    if point_count >= 2:
        pass
    else:
        raise ValueError("Arbitrary source waveform requires at least two points")

    for point_index in range(point_count - 1):
        if float(time_points[point_index + 1]) > float(time_points[point_index]):
            pass
        else:
            raise ValueError("Arbitrary source waveform times must be strictly increasing")


def _build_clipped_lookup_expression(coord_var: Var, x_vars: Sequence[Var], y_vars: Sequence[Var]) -> Expr:
    point_count: int = len(x_vars)
    result_expr: Expr = y_vars[0] * _build_lookup_comparison(coord_var, CmpOp.LT, x_vars[0])
    segment_index: int

    for segment_index in range(point_count - 1):
        segment_expr: Expr = _build_segment_expression(
            coord_var=coord_var,
            x_left_var=x_vars[segment_index],
            x_right_var=x_vars[segment_index + 1],
            y_left_var=y_vars[segment_index],
            y_right_var=y_vars[segment_index + 1],
        )
        lower_expr: Expr = _build_lookup_comparison(coord_var, CmpOp.GE, x_vars[segment_index])
        upper_expr: Expr = _build_lookup_comparison(coord_var, CmpOp.LT, x_vars[segment_index + 1])
        result_expr = result_expr + segment_expr * lower_expr * upper_expr

    result_expr = result_expr + y_vars[point_count - 1] * _build_lookup_comparison(coord_var, CmpOp.GE, x_vars[point_count - 1])
    return result_expr


def _build_waveform_parameters(vf: VarFactory,
                               template: EmtModelTemplate,
                               time_points: Sequence[float],
                               value_points: Sequence[float],
                               name: str) -> tuple[list[Var], list[Var]]:
    x_vars: list[Var] = list()
    y_vars: list[Var] = list()
    point_index: int
    parameter_var: Var

    _validate_waveform_points(time_points, value_points)

    for point_index in range(len(time_points)):
        parameter_var = vf.add_var(f"arr_x{point_index + 1}_{name}")
        template.block.event_dict[parameter_var] = vf.add_const(float(time_points[point_index]), name=f"arr_x{point_index + 1}")
        x_vars.append(parameter_var)

        parameter_var = vf.add_var(f"arr_y{point_index + 1}_{name}")
        template.block.event_dict[parameter_var] = vf.add_const(float(value_points[point_index]), name=f"arr_y{point_index + 1}")
        y_vars.append(parameter_var)

    return x_vars, y_vars


[docs] def get_arbitrary_waveform_current_source_emt_template(vf: VarFactory, phN: bool = False, phA: bool = True, phB: bool = False, phC: bool = False, time_points: Sequence[float] = (0.0, 0.02, 0.04), value_points: Sequence[float] = (0.0, 1.0, 0.0), name: str | None = "arbitrary_waveform_current_source_emt") -> EmtModelTemplate: """ Build one phase-selective EMT current source driven by one arbitrary time waveform. The same time/value waveform is applied to every active phase. :param vf: EMT variable factory. :param phN: Whether neutral is active. :param phA: Whether phase A is active. :param phB: Whether phase B is active. :param phC: Whether phase C is active. :param time_points: Strictly increasing waveform times in seconds. :param value_points: Matching waveform values. :param name: Optional symbolic block name. :return: Configured EMT template. """ active_labels: List[str] = _get_active_nabc_labels(phN=phN, phA=phA, phB=phB, phC=phC) resolved_name: str = _build_template_name("arbitrary_waveform_current_source_emt", phN, phA, phB, phC, name) template: EmtModelTemplate = EmtModelTemplate() template.tpe = DeviceType.GeneratorDevice template.name = resolved_name template.block.name = resolved_name tau_var: Var = vf.add_var(name=f"t_src_{resolved_name}") d_tau_var: Var = vf.add_diff_var(name=f"d_t_src_{resolved_name}", base_var=tau_var) x_vars, y_vars = _build_waveform_parameters(vf=vf, template=template, time_points=time_points, value_points=value_points, name=resolved_name) waveform_expr: Expr = _build_clipped_lookup_expression(coord_var=tau_var, x_vars=x_vars, y_vars=y_vars) voltage_vars: Dict[str, Var] = dict() current_vars: Dict[str, Var] = dict() in_vars: List[Var] = list() algebraic_vars: List[Var] = list() algebraic_eqs: List[Expr] = list() out_vars: List[Var] = list() phase_label: str for phase_label in active_labels: voltage_var: Var = vf.add_var(name=f"v_{phase_label}_{resolved_name}", reference=_get_voltage_reference(phase_label)) current_var: Var = vf.add_var(name=f"i_{phase_label}_{resolved_name}", reference=_get_current_reference(phase_label)) voltage_vars[phase_label] = voltage_var current_vars[phase_label] = current_var in_vars.append(voltage_var) algebraic_vars.append(current_var) algebraic_eqs.append(current_var - waveform_expr) out_vars.append(current_var) template.block.init_eqs[current_var] = y_vars[0] template.block.in_vars = in_vars template.block.state_vars = [tau_var] template.block.diff_vars = [d_tau_var] template.block.state_eqs = [sym.Const(1.0)] template.block.algebraic_vars = algebraic_vars template.block.algebraic_eqs = algebraic_eqs template.block.out_vars = out_vars template.block.external_mapping = _build_external_mapping(voltage_vars, current_vars) template.block.init_eqs[tau_var] = x_vars[0] return template
[docs] def get_arbitrary_waveform_voltage_source_emt_template(vf: VarFactory, phN: bool = False, phA: bool = True, phB: bool = False, phC: bool = False, time_points: Sequence[float] = (0.0, 0.02, 0.04), value_points: Sequence[float] = (0.0, 1.0, 0.0), source_conductance_value: float = 100.0, name: str | None = "arbitrary_waveform_voltage_source_emt") -> EmtModelTemplate: """ Build one phase-selective EMT voltage source driven by one arbitrary time waveform. The same time/value waveform is applied to every active phase and injected as one Norton equivalent ``i = g * (v_src - v_bus)``. :param vf: EMT variable factory. :param phN: Whether neutral is active. :param phA: Whether phase A is active. :param phB: Whether phase B is active. :param phC: Whether phase C is active. :param time_points: Strictly increasing waveform times in seconds. :param value_points: Matching waveform values. :param source_conductance_value: Norton conductance. :param name: Optional symbolic block name. :return: Configured EMT template. """ active_labels: List[str] = _get_active_nabc_labels(phN=phN, phA=phA, phB=phB, phC=phC) resolved_name: str = _build_template_name("arbitrary_waveform_voltage_source_emt", phN, phA, phB, phC, name) template: EmtModelTemplate = EmtModelTemplate() template.tpe = DeviceType.GeneratorDevice template.name = resolved_name template.block.name = resolved_name tau_var: Var = vf.add_var(name=f"t_src_{resolved_name}") d_tau_var: Var = vf.add_diff_var(name=f"d_t_src_{resolved_name}", base_var=tau_var) conductance_var: Var = vf.add_var(name=f"g_src_{resolved_name}") template.block.event_dict[conductance_var] = vf.add_const(float(source_conductance_value)) x_vars, y_vars = _build_waveform_parameters(vf=vf, template=template, time_points=time_points, value_points=value_points, name=resolved_name) waveform_expr: Expr = _build_clipped_lookup_expression(coord_var=tau_var, x_vars=x_vars, y_vars=y_vars) voltage_vars: Dict[str, Var] = dict() current_vars: Dict[str, Var] = dict() in_vars: List[Var] = list() algebraic_vars: List[Var] = list() algebraic_eqs: List[Expr] = list() out_vars: List[Var] = list() phase_label: str for phase_label in active_labels: voltage_var: Var = vf.add_var(name=f"v_{phase_label}_{resolved_name}", reference=_get_voltage_reference(phase_label)) current_var: Var = vf.add_var(name=f"i_{phase_label}_{resolved_name}", reference=_get_current_reference(phase_label)) voltage_vars[phase_label] = voltage_var current_vars[phase_label] = current_var in_vars.append(voltage_var) algebraic_vars.append(current_var) algebraic_eqs.append(current_var - conductance_var * (waveform_expr - voltage_var)) out_vars.append(current_var) template.block.init_eqs[current_var] = conductance_var * (y_vars[0] - voltage_var) template.block.in_vars = in_vars template.block.state_vars = [tau_var] template.block.diff_vars = [d_tau_var] template.block.state_eqs = [sym.Const(1.0)] template.block.algebraic_vars = algebraic_vars template.block.algebraic_eqs = algebraic_eqs template.block.out_vars = out_vars template.block.external_mapping = _build_external_mapping(voltage_vars, current_vars) template.block.init_eqs[tau_var] = x_vars[0] return template