Source code for VeraGridEngine.Utils.Symbolic.symbolic_io

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0

from __future__ import annotations
import copy
from typing import Dict, Any, List

from VeraGridEngine.Devices.Dynamic.var_factory import VarFactory, Connection
from VeraGridEngine.Utils.Symbolic import SharedVarReferenceType
from VeraGridEngine.Utils.Symbolic.symbolic import Var, Expr, Const, BinOp, UnOp, Func, Func2
from VeraGridEngine.Utils.Symbolic.block import Block
from VeraGridEngine.enumerations import VarPowerFlowReferenceType, ParamPowerFlowReferenceType


[docs] def symbolic_objects_to_dict(obj_dict: Dict[int | str, Var | Const | Var | SharedVarReferenceType]) -> List[Dict[str, Any]]: """ Save the list of all unique vars, diffvars and const :param obj_dict: Dictionary storing the unique objects :return: List of dictionaries representing each object """ lst: List[Dict[str, Any]] = list() for uuid, expr in obj_dict.items(): if isinstance(expr, Const): # add it to the references dict obj_dict[expr.uid] = expr # The const didn't exist, we create it here if isinstance(expr.value, complex): lst.append({"type": "Const", "value": [expr.value.real, expr.value.imag], "kind": "complex", "uid": expr.uid}) else: lst.append({"type": "Const", "value": expr.value, "uid": expr.uid}) elif isinstance(expr, Var): shared = expr.shared_ref if expr.base_var is not None: if type(expr.ref) == str: lst.append({ "type": "DiffVar", "name": expr.name, "uid": expr.uid, "non_mutable_uid": expr.non_mutable_uid, "base_var": expr.base_var.uid, "shared_ref": {"name": shared.name if shared is not None else None, "uid": shared.uid if shared is not None else None}, "ref": expr.ref.value if expr.ref is not None else None, }) else: lst.append({ "type": "DiffVar", "name": expr.name, "uid": expr.uid, "non_mutable_uid": expr.non_mutable_uid, "base_var": expr.base_var.uid, "shared_ref": {"name": shared.name if shared is not None else None, "uid": shared.uid if shared is not None else None}, "ref": expr.ref.value if expr.ref is not None else None, }) else: # it is a normal var if type(expr.ref) == str: lst.append({"type": "Var", "name": expr.name, "uid": expr.uid, "non_mutable_uid": expr.non_mutable_uid, "base_var": None, "shared_ref": {"name": shared.name if shared is not None else None, "uid": shared.uid if shared is not None else None}, "ref": expr.ref.value if expr.ref is not None else None, }) else: lst.append({"type": "Var", "name": expr.name, "uid": expr.uid, "non_mutable_uid": expr.non_mutable_uid, "base_var": None, "shared_ref": {"name": shared.name if shared is not None else None, "uid": shared.uid if shared is not None else None}, "ref": expr.ref.value if expr.ref is not None else None, }) if isinstance(expr, SharedVarReferenceType): lst.append({"type": "share_reference", "name": expr.name, "uid": expr.uid}) if isinstance(expr, Connection): lst.append({"type": "Connection", "non_mutable_uid": expr.non_mutable_uid, "name": expr.name, "uid": expr.uid}) return lst
[docs] def connections_to_dict(obj_dict: Dict[int, List[Connection]]) -> Dict[int, List[Any]]: conn_dict: Dict[int, List[Any]] = dict() for uuid, connections_list in obj_dict.items(): conn_list: List[Any] = list() for connection in connections_list: conn_list.append({"type": "Connection", "non_mutable_uid": connection.non_mutable_uid, "name": connection.name, "uid": connection.uid}) conn_dict[uuid] = conn_list return conn_dict
[docs] def expr_to_dict(expr: Expr, const_dict: Dict[int, Const], var_dict: Dict[int, Var], diff_var_dict: Dict[int, Var]) -> Dict[str, Any]: """ Serialise any `Expr` tree into a plain Python dictionary that’s JSON-friendly. Each node type becomes a small dict that records: β€’ its own type (\"Const\", \"Var\", \"BinOp\", …) β€’ the data it carries (value, name, operator…) β€’ its unique uid (string, so it survives round-trip) β€’ nested children (recursively serialised) :param expr: Expression child :param const_dict: Dictionary that keeps a reference of the Const objects already saved :param var_dict: Dictionary that keeps a reference of the VAr objects already saved :param diff_var_dict: Dictionary that keeps a reference of the DiffVar objects already saved :return: Dict to save in jason """ if isinstance(expr, Const): c = const_dict.get(expr.uid, None) if c is None: # add it to the references dict const_dict[expr.uid] = expr # the const already exists return { "type": "Const", "uid": expr.uid } elif isinstance(expr, Var): if expr.base_var is not None: c = diff_var_dict.get(expr.uid, None) if c is None: # add it to the references dict diff_var_dict[expr.uid] = expr # the diffvar already exists return { "type": "DiffVar", "uid": expr.uid } else: c = var_dict.get(expr.uid, None) if c is None: # add it to the references dict var_dict[expr.uid] = expr # the var already exists return { "type": "Var", "uid": expr.uid } elif isinstance(expr, BinOp): return { "type": "BinOp", "op": expr.op, "left": expr_to_dict(expr=expr.left, const_dict=const_dict, var_dict=var_dict, diff_var_dict=diff_var_dict), "right": expr_to_dict(expr=expr.right, const_dict=const_dict, var_dict=var_dict, diff_var_dict=diff_var_dict), "uid": expr.uid, } elif isinstance(expr, UnOp): return { "type": "UnOp", "op": expr.op, # only \"-\" for now "operand": expr_to_dict(expr=expr.operand, const_dict=const_dict, var_dict=var_dict, diff_var_dict=diff_var_dict), "uid": expr.uid, } elif isinstance(expr, Func): return { "type": "Func", "op": expr.op, # sin, cos, log, … "arg": expr_to_dict(expr=expr.arg, const_dict=const_dict, var_dict=var_dict, diff_var_dict=diff_var_dict), "uid": expr.uid, } elif isinstance(expr, Func2): return { "type": "Func2", "name": expr.name, "arg1": expr_to_dict(expr=expr.arg1, const_dict=const_dict, var_dict=var_dict, diff_var_dict=diff_var_dict), "arg2": expr_to_dict(expr=expr.arg2, const_dict=const_dict, var_dict=var_dict, diff_var_dict=diff_var_dict), "uid": expr.uid, } else: raise TypeError(f"Unsupported Expr subclass: {type(expr).__name__}")
[docs] def expr_list_to_list(lst: List[Expr], const_dict: Dict[int, Const], var_dict: Dict[int, Var], diff_var_dict: Dict[int, Var]) -> List[Dict[str, Any]]: """ :param lst: :param const_dict: :param var_dict: :param diff_var_dict: :return: """ lst2: List[Dict[str, Any]] = list() for expr in lst: lst2.append( expr_to_dict(expr=expr, const_dict=const_dict, var_dict=var_dict, diff_var_dict=diff_var_dict) ) return lst2
[docs] def parse_expr(data: Dict[str, Any], const_dict: Dict[int, Const], var_dict: Dict[int, Var], diff_var_dict: Dict[int, Var]) -> Const | Var | UnOp | BinOp | Func | Func2: """ De-Serialize expression from dictionary :param data: Some dictionary containing the expression :param const_dict: Dictionary that keeps a reference of the Const objects already saved :param var_dict: Dictionary that keeps a reference of the VAr objects already saved :param diff_var_dict: Dictionary that keeps a reference of the DiffVar objects already saved :return: Expression chil """ t = data["type"] if t == "Const": return const_dict[data["uid"]] elif t == "Var": return var_dict[data["uid"]] elif t == "DiffVar": return diff_var_dict[data["uid"]] elif t == "BinOp": left = parse_expr(data=data["left"], const_dict=const_dict, var_dict=var_dict, diff_var_dict=diff_var_dict) right = parse_expr(data=data["right"], const_dict=const_dict, var_dict=var_dict, diff_var_dict=diff_var_dict) obj = BinOp(left=left, op=data["op"], right=right, uid=data["uid"]) elif t == "UnOp": operand = parse_expr(data=data["operand"], const_dict=const_dict, var_dict=var_dict, diff_var_dict=diff_var_dict) obj = UnOp(op=data["op"], operand=operand, uid=data["uid"]) elif t == "Func": arg = parse_expr(data=data["arg"], const_dict=const_dict, var_dict=var_dict, diff_var_dict=diff_var_dict) obj = Func(arg=arg, op=data["op"], uid=data["uid"]) elif t == "Func2": arg1: Const | Var | UnOp | BinOp | Func | Func2 = parse_expr(data=data["arg1"], const_dict=const_dict, var_dict=var_dict, diff_var_dict=diff_var_dict) arg2: Const | Var | UnOp | BinOp | Func | Func2 = parse_expr(data=data["arg2"], const_dict=const_dict, var_dict=var_dict, diff_var_dict=diff_var_dict) obj = Func2(name=data["name"], arg1=arg1, arg2=arg2, uid=data["uid"]) else: raise ValueError(f"Unknown type '{t}' in symbolic deserialization") return obj
[docs] def parse_expr_list(lst: List[Dict[str, Any]], const_dict: Dict[int, Const], var_dict: Dict[int, Var], diff_var_dict: Dict[int, Var]) -> List[Const | Var | UnOp | BinOp | Func | Func2]: """ :param lst: :param const_dict: :param var_dict: :param diff_var_dict: :return: """ lst2 = list() for data in lst: lst2.append( parse_expr( data=data, const_dict=const_dict, var_dict=var_dict, diff_var_dict=diff_var_dict ) ) return lst2
[docs] class BlockSaver: __slots__ = ( "var_factory", "main_block_uids", "blocks", ) def __init__(self, var_factory: VarFactory): self.var_factory = var_factory self.main_block_uids: List[int] = list() self.blocks: Dict[int, Dict[str, Any]] = dict()
[docs] def get_const_to_save(self) -> List[Dict[str, Any]]: """ :return: """ return symbolic_objects_to_dict(self.var_factory.get_const_dict())
[docs] def get_vars_to_save(self) -> List[Dict[str, Any]]: """ :return: """ return symbolic_objects_to_dict(self.var_factory.get_vars_dict())
[docs] def get_diff_vars_to_save(self) -> List[Dict[str, Any]]: """ :return: """ return symbolic_objects_to_dict(self.var_factory.get_diff_var_dict())
[docs] def get_shared_references_to_save(self) -> List[Dict[str, Any]]: """ :return: :rtype: """ return symbolic_objects_to_dict(self.var_factory.get_references_dict())
[docs] def get_connections_to_save(self) -> Dict[int, List[Any]]: """ :return: :rtype: """ return connections_to_dict(self.var_factory.get_connections_dict())
[docs] def get_blocks(self) -> Dict[int, Dict[str, Any]]: return self.blocks
def _ensure_var_registered(self, var: Var) -> None: """ Ensure that one algebraic variable is present in the shared variable factory. Some blocks created through GUI/template workflows keep live variable objects that are not reinserted into the factory before serialization. The saver must register them instead of failing deep in a background thread. :param var: Variable to register. :return: None. """ try: found_var = self.var_factory.get_var(var.uid) except Exception: found_var = None if found_var is None: self.var_factory._var_dict[var.uid] = var else: pass def _ensure_diff_var_registered(self, diff_var: Var) -> None: """ Ensure that one differential variable is present in the shared variable factory. :param diff_var: Differential variable to register. :return: None. """ try: found_var = self.var_factory.get_diff_var(diff_var.uid) except Exception: found_var = None if found_var is None: self.var_factory._diff_var_dict[diff_var.uid] = diff_var else: pass
[docs] def save_block(self, blk: Block, main: bool = False) -> Dict[str, Any]: """ Get a dictionary representing the block All "global references" such a as Conts, Var and DiffVar are stored in the class for later :param blk: Block :param main: is it the main block? :return: Dictionary representing the block """ state_expressions = expr_list_to_list( lst=blk.state_eqs, const_dict=self.var_factory.get_const_dict(), var_dict=self.var_factory.get_vars_dict(), diff_var_dict=self.var_factory.get_diff_var_dict() ) algebraic_expressions = expr_list_to_list( lst=blk.algebraic_eqs, const_dict=self.var_factory.get_const_dict(), var_dict=self.var_factory.get_vars_dict(), diff_var_dict=self.var_factory.get_diff_var_dict() ) differential_eqs_expressions = expr_list_to_list( lst=blk.differential_eqs, const_dict=self.var_factory.get_const_dict(), var_dict=self.var_factory.get_vars_dict(), diff_var_dict=self.var_factory.get_diff_var_dict() ) init_eq_list = list() diff_init_eq_list = list() for var, expr in blk.init_eqs.items(): self._ensure_var_registered(var) init_eq_list.append( { "var": var.uid, "expr": expr_to_dict( expr=expr, const_dict=self.var_factory.get_const_dict(), var_dict=self.var_factory.get_vars_dict(), diff_var_dict=self.var_factory.get_diff_var_dict() ) } ) for diff_var, expr in blk.diff_init_eqs.items(): self._ensure_diff_var_registered(diff_var) diff_init_eq_list.append( { "var": diff_var.uid, "expr": expr_to_dict( expr=expr, const_dict=self.var_factory.get_const_dict(), var_dict=self.var_factory.get_vars_dict(), diff_var_dict=self.var_factory.get_diff_var_dict() ) } ) events_list = list() for var, expr in blk.event_dict.items(): self._ensure_var_registered(var) events_list.append( { "var": var.uid, "expr": expr_to_dict( expr=expr, const_dict=self.var_factory.get_const_dict(), var_dict=self.var_factory.get_vars_dict(), diff_var_dict=self.var_factory.get_diff_var_dict() ) } ) in_vars: List[int] = list() for var in blk.in_vars: self._ensure_var_registered(var) in_vars.append(var.uid) out_vars: List[int] = list() for var in blk.out_vars: self._ensure_var_registered(var) out_vars.append(var.uid) init_values: List[Dict[str, float | int]] = list() for var, value in blk.init_values.items(): self._ensure_var_registered(var) init_values.append({"var": var.uid, "value": value.value}) parameters: List[Dict[str, float | int]] = list() for var, value in blk.parameters.items(): self._ensure_var_registered(var) parameters.append({"var": var.uid, "value": value.value}) # diff_vars: List[DiffVar] = list() for diff_var in blk.diff_vars: self._ensure_diff_var_registered(diff_var) for dyn_var_type, var in blk.external_mapping.items(): if var is not None: self._ensure_var_registered(var) else: pass # save diagram diagram = blk.diagram.to_dict() # save children for child in blk.children: self.save_block(child) d = { "uid": blk.uid, "vars_glob_name2uid": blk.vars_glob_name2uid, "state_vars": [v.uid for v in blk.state_vars], "state_eqs": state_expressions, "algebraic_vars": [v.uid for v in blk.algebraic_vars], "algebraic_eqs": algebraic_expressions, "diff_vars": [v.uid for v in blk.diff_vars], "differential_eqs": differential_eqs_expressions, "reformulated_vars": [v.uid for v in blk.reformulated_vars], "init_eqs": init_eq_list, "diff_init_eqs": diff_init_eq_list, "init_values": init_values, "parameters": parameters, "external_mapping": {dyn_var_type.value: var.uid if var is not None else None for dyn_var_type, var in blk.external_mapping.items()}, "api_obj_mapping": {dyn_param_type.value: param.uid if param is not None else None for dyn_param_type, param in blk.api_obj_mapping.items()}, "event_dict": events_list, "name": blk.name, "children": [child.uid for child in blk.children], "in_vars": in_vars, "out_vars": out_vars, "diagram": diagram } self.blocks[blk.uid] = d if main: self.main_block_uids.append(blk.uid) return d
[docs] class BlockParser: __slots__ = ( "var_factory", "block_dict", ) def __init__(self, var_factory: VarFactory): self.var_factory = var_factory self.block_dict: Dict[int, Block] = dict() def _get_var_by_non_mutable_uid(self, non_mutable_uid: int) -> Var: """ Recover one algebraic variable using its stable non-mutable UID. The symbolic connection system intentionally aliases connected variables by rewriting their runtime ``uid`` values. That aliasing is required by the solver, but block interfaces such as ``in_vars`` and ``out_vars`` must still be reconstructed with the original variable objects so metadata like ``ref`` keeps the semantic meaning assigned by the template author. Looking up ports by the mutable ``uid`` after replaying saved connections can therefore return a different connected variable, such as a bus variable replacing a branch terminal variable. :param non_mutable_uid: Stable symbolic variable identity. :return: Matching algebraic variable. :raises KeyError: If the variable is not present in the factory. """ var_obj: Var | None = self.var_factory.get_vars_dict().get(non_mutable_uid, None) if var_obj is not None: return var_obj else: raise KeyError(f"Var with non_mutable_uid {non_mutable_uid} was not found in VarFactory") def _find_var_or_diff_var_by_non_mutable_uid(self, non_mutable_uid: int) -> Var | None: """ Recover one symbolic variable or differential variable by stable identity. External mappings expose semantic power-flow references to specific symbolic variables. Saved connection replay may alias runtime ``uid`` values across connected variables, so rebuilding external mappings by mutable ``uid`` can return a different connected variable and lose the original semantic role. This helper resolves the mapping through the stable dictionary keys used by the variable factory instead. :param non_mutable_uid: Stable symbolic identity stored in the file. :return: Matching variable or ``None`` when the mapping is unresolved. """ var_obj: Var | None = self.var_factory.get_vars_dict().get(non_mutable_uid, None) if var_obj is not None: return var_obj else: diff_var_obj: Var | None = self.var_factory.get_diff_var_dict().get(non_mutable_uid, None) if diff_var_obj is not None: return diff_var_obj else: return None
[docs] def parse_consts(self, data: List[Dict[str, Any]]): """ :param data: :return: """ self.var_factory.parse_const_dict(data_list=data)
[docs] def parse_vars(self, data: List[Dict[str, Any]]): """ :param data: :return: """ self.var_factory.parse_var_dict(data_list=data)
[docs] def parse_diff_vars(self, data: List[Dict[str, Any]]): """ :param data: :return: """ self.var_factory.parse_diff_var_dict(data_list=data)
[docs] def parse_references(self, data: List[Dict[str, Any]]): """ :param data: :type data: :return: :rtype: """ self.var_factory.parse_references_dict(datalist=data)
[docs] def parse_connections(self, data: Dict[int, List[Any]]): """ :param data: :type data: :return: :rtype: """ self.var_factory.parse_connections_dict(datalist=data)
[docs] def parse_block(self, blocks_data: Dict[int, Dict[str, Any]], main_block_uid: int) -> Block: """ Parse block as :param main_block_uid: :param blocks_data: :return: """ if main_block_uid not in blocks_data: main_block_uid = str(main_block_uid) data = blocks_data[main_block_uid] state_vars = [self.var_factory.get_var(v_uid) for v_uid in data["state_vars"]] state_eqs = parse_expr_list(lst=data["state_eqs"], const_dict=self.var_factory.get_const_dict(), var_dict=self.var_factory.get_vars_dict(), diff_var_dict=self.var_factory.get_diff_var_dict()) algebraic_vars = [self.var_factory.get_var(v_uid) for v_uid in data["algebraic_vars"]] algebraic_eqs = parse_expr_list(lst=data["algebraic_eqs"], const_dict=self.var_factory.get_const_dict(), var_dict=self.var_factory.get_vars_dict(), diff_var_dict=self.var_factory.get_diff_var_dict()) if data["diff_vars"]: diff_vars = [self.var_factory.get_diff_var(v_uid) for v_uid in data["diff_vars"]] else: diff_vars = [] differential_eqs = parse_expr_list(lst=data["differential_eqs"], const_dict=self.var_factory.get_const_dict(), var_dict=self.var_factory.get_vars_dict(), diff_var_dict=self.var_factory.get_diff_var_dict()) # Rebuild interface variables using the stable non-mutable UID stored # in the factory keys. This preserves the original port object and its # semantic reference even when runtime connections have aliased the # mutable ``uid`` to another connected variable. in_vars = [self._get_var_by_non_mutable_uid(v_uid) for v_uid in data["in_vars"]] # Apply the same stable lookup to outputs for consistency with inputs # and to avoid replacing exported branch variables with connected bus # variables after deserialization. out_vars = [self._get_var_by_non_mutable_uid(v_uid) for v_uid in data["out_vars"]] children = [self.parse_block(blocks_data, child_uid) for child_uid in data["children"]] init_eqs: Dict[Var, Expr] = dict() for entry in data["init_eqs"]: var = self.var_factory.get_var(entry["var"]) init_eqs[var] = parse_expr(data=entry["expr"], const_dict=self.var_factory.get_const_dict(), var_dict=self.var_factory.get_vars_dict(), diff_var_dict=self.var_factory.get_diff_var_dict()) event_dict: Dict[Var, Expr] = dict() for entry in data["event_dict"]: var = self.var_factory.get_var(entry["var"]) event_dict[var] = parse_expr(data=entry["expr"], const_dict=self.var_factory.get_const_dict(), var_dict=self.var_factory.get_vars_dict(), diff_var_dict=self.var_factory.get_diff_var_dict()) parameters: Dict[Var, Const] = dict() for entry in data["parameters"]: var = self.var_factory.get_var(entry["var"]) parameters[var] = Const(entry["value"]) init_values: Dict[Var, Const] = dict() for entry in data["init_values"]: var = self.var_factory.get_var(entry["var"]) init_values[var] = Const(entry["value"]) external_mapping: Dict[VarPowerFlowReferenceType, Var | None] = dict() for key_str, var_uid in data["external_mapping"].items(): key = VarPowerFlowReferenceType(key_str) # Rebuild PF-exposed mappings using the stable symbolic identity. # This preserves the original variable object selected by the # template even when runtime connections have aliased mutable UIDs. var_in_varfactory = self._find_var_or_diff_var_by_non_mutable_uid(var_uid) if var_in_varfactory is not None: external_mapping[key] = var_in_varfactory else: external_mapping[key] = None api_obj_mapping: Dict[ParamPowerFlowReferenceType, Var] = dict() for key_str, var_uid in data["api_obj_mapping"].items(): key = ParamPowerFlowReferenceType(key_str) api_obj_mapping[key] = self.var_factory.get_var(var_uid) reformulated_vars = [self.var_factory.get_var(v_uid) for v_uid in data["reformulated_vars"]] block = Block( state_vars=state_vars, state_eqs=state_eqs, algebraic_vars=algebraic_vars, algebraic_eqs=algebraic_eqs, diff_vars=diff_vars, differential_eqs=differential_eqs, in_vars=in_vars, out_vars=out_vars, init_eqs=init_eqs, event_dict=event_dict, children=children, # TODO think about this parameters=parameters, init_values=init_values, external_mapping=external_mapping, api_obj_mapping=api_obj_mapping, reformulated_vars=reformulated_vars, name=data["name"], uid=data["uid"] ) diagram_data = data.get("diagram", None) if diagram_data is not None: block.diagram.parse(diagram_data) self.block_dict[block.uid] = block return block
[docs] def block_deep_copy(block: Block, var_factory: VarFactory): """ Create depp copy of a block :param block: :param var_factory: :return: """ # TODO: avoid passing to json saver = BlockSaver(var_factory) d = saver.save_block(block, main=True) const_save = saver.get_const_to_save() vars_save = saver.get_vars_to_save() diff_vars_save = saver.get_diff_vars_to_save() blocks = saver.get_blocks() parser = BlockParser(VarFactory()) parser.parse_consts(data=const_save) parser.parse_vars(data=vars_save) parser.parse_diff_vars(data=diff_vars_save) block2 = parser.parse_block(blocks, block.uid) return block2
[docs] def duplicate_var(var_factory: VarFactory, old_to_new_var: Dict[int, Var], var: Var | None) -> Var | None: """ Duplicate one symbolic variable while preserving UID-based reuse. The helper now accepts ``None`` because some FMU/EMT external mappings intentionally leave optional references unresolved. In those cases the missing mapping must stay missing after the block clone. :param var_factory: Variable factory used to allocate the cloned variable. :param old_to_new_var: UID-to-variable clone map. :param var: Source variable or ``None``. :return: Cloned variable or ``None``. """ if var is None: new_var: Var | None = None else: if var.uid in old_to_new_var: new_var = old_to_new_var[var.uid] else: base_var_new: Var | None = None if var.base_var is not None: base_var_new = duplicate_required_var(var_factory, old_to_new_var, var.base_var) # Differential variables must be linked to the cloned base variable. Passing the # original differential pointer here would reconnect the new chain to the source. new_var = var_factory.add_diff_var( name=var.name, reference=var.ref, network_conn=var.network_conn, shared_reference=var.shared_ref, diff_var=None, base_var=base_var_new ) else: # Base variables are allocated without pulling their derivative pointer through. # The derivative clone will attach itself when it is duplicated. new_var = var_factory.add_var( name=var.name, reference=var.ref, network_conn=var.network_conn, shared_reference=var.shared_ref, ) old_to_new_var[var.uid] = new_var return new_var
[docs] def duplicate_required_var(var_factory: VarFactory, old_to_new_var: Dict[int, Var], var: Var) -> Var: """ Duplicate a variable that must exist in the source block structure. :param var_factory: Variable factory used to allocate the cloned variable. :param old_to_new_var: UID-to-variable clone map. :param var: Source variable. :return: Cloned variable. """ new_var: Var | None = duplicate_var(var_factory, old_to_new_var, var) if new_var is None: raise TypeError("duplicate_required_var: source variable cannot be None") else: return new_var
[docs] def duplicate_const(var_factory: VarFactory, old_to_new_const: Dict[int, Const], const: Const) -> Const: """ :param var_factory: :param old_to_new_const: :param const: :return: """ if const.uid in old_to_new_const: return old_to_new_const[const.uid] new_const = var_factory.add_const(value=const.value, name=const.name) old_to_new_const[const.uid] = new_const return new_const
[docs] def duplicate_expr(var_factory: VarFactory, old_to_new_const: Dict[int, Const], old_to_new_var: Dict[int, Var], expr: Expr) -> Expr: """ :param var_factory: :param old_to_new_const: :param old_to_new_var: :param expr: :return: """ if isinstance(expr, Var): return duplicate_required_var(var_factory, old_to_new_var, expr) if isinstance(expr, Const): return duplicate_const(var_factory, old_to_new_const, expr) if isinstance(expr, BinOp): return BinOp( duplicate_expr(var_factory, old_to_new_const, old_to_new_var, expr.left), expr.op, duplicate_expr(var_factory, old_to_new_const, old_to_new_var, expr.right) ) if isinstance(expr, UnOp): return UnOp(expr.op, duplicate_expr(var_factory, old_to_new_const, old_to_new_var, expr.operand)) if isinstance(expr, Func): return Func(duplicate_expr(var_factory, old_to_new_const, old_to_new_var, expr.arg), expr.op) if isinstance(expr, Func2): return Func2(expr.name, duplicate_expr(var_factory, old_to_new_const, old_to_new_var, expr.arg1), duplicate_expr(var_factory, old_to_new_const, old_to_new_var, expr.arg2)) return expr
def _remember_var(var: Var | None, vars_by_uid: Dict[int, Var]) -> None: """ Collect one variable and its derivative chain by uid. :param var: Variable to collect, or ``None`` for optional mappings. :param vars_by_uid: Accumulator keyed by source variable UID. :return: Nothing. """ if var is None: pass else: if var.uid in vars_by_uid: pass else: vars_by_uid[var.uid] = var _remember_var(var.base_var, vars_by_uid) _remember_var(var.diff_var, vars_by_uid) def _remember_expr_vars(expr: Expr, vars_by_uid: Dict[int, Var]) -> None: """ Collect variables referenced by one expression. :param expr: Expression to scan. :param vars_by_uid: Accumulator keyed by source variable UID. :return: Nothing. """ var: Var for var in expr.get_vars(): _remember_var(var, vars_by_uid) def _collect_block_vars_by_uid(block: Block, vars_by_uid: Dict[int, Var] | None = None) -> Dict[int, Var]: """ Collect all variables reachable from a block, including expression-only references. :param block: Block to scan. :param vars_by_uid: Optional accumulator for recursive child scans. :return: Variables keyed by source UID. """ if vars_by_uid is None: result: Dict[int, Var] = dict() else: result = vars_by_uid for var_list in ( block.state_vars, block.algebraic_vars, block.diff_vars, block.reformulated_vars, block.in_vars, block.out_vars, ): var: Var for var in var_list: _remember_var(var, result) for mapping in ( block.parameters, block.init_values, block.init_eqs, block.diff_init_eqs, block.discrete_eqs, block.event_dict, block.mode_dict, ): mapping_key: Var mapping_value: Expr for mapping_key, mapping_value in mapping.items(): _remember_var(mapping_key, result) if isinstance(mapping_value, Expr): _remember_expr_vars(mapping_value, result) else: pass external_var: Var | None for external_var in block.external_mapping.values(): _remember_var(external_var, result) api_var: Var | None for api_var in block.api_obj_mapping.values(): _remember_var(api_var, result) for expr_list in ( block.state_eqs, block.algebraic_eqs, block.differential_eqs, ): expr: Expr for expr in expr_list: _remember_expr_vars(expr, result) child: Block for child in block.children: _collect_block_vars_by_uid(child, result) return result def _build_var_mapping(old_vars_by_uid: Dict[int, Var], old_to_new_var: Dict[int, Var]) -> Dict[Expr | str, Expr]: """ Build the object/name substitution map used by procedural logic cloning. :param old_vars_by_uid: Source variables keyed by UID. :param old_to_new_var: Cloned variables keyed by source UID. :return: Procedural logic variable substitution map. """ mapping: Dict[Expr | str, Expr] = dict() uid: int old_var: Var for uid, old_var in old_vars_by_uid.items(): new_var: Var | None = old_to_new_var.get(uid, None) if new_var is not None: mapping[old_var] = new_var mapping[old_var.name] = new_var else: pass return mapping def _duplicate_block(block: Block, var_factory: VarFactory, old_to_new_var: Dict[int, Var], old_to_new_const: Dict[int, Const]) -> Block: """ Duplicate a block using shared maps so child/parent variable links remain coherent. :param block: Source block. :param var_factory: Variable factory used to allocate cloned variables and constants. :param old_to_new_var: UID-to-variable clone map shared across the block tree. :param old_to_new_const: UID-to-constant clone map shared across the block tree. :return: Duplicated block. """ old_vars_by_uid: Dict[int, Var] = _collect_block_vars_by_uid(block) for old_var in old_vars_by_uid.values(): duplicate_var(var_factory, old_to_new_var, old_var) const: Const for const in block.parameters.values(): duplicate_const(var_factory, old_to_new_const, const) for const in block.init_values.values(): duplicate_const(var_factory, old_to_new_const, const) # Clone the variable containers from the shared UID map so every repeated # reference inside the source block points to the same cloned object. new_state_vars: List[Var] = [duplicate_required_var(var_factory, old_to_new_var, v) for v in block.state_vars] new_algebraic_vars: List[Var] = [ duplicate_required_var(var_factory, old_to_new_var, v) for v in block.algebraic_vars ] new_diff_vars: List[Var] = [duplicate_required_var(var_factory, old_to_new_var, v) for v in block.diff_vars] new_reformulated_vars: List[Var] = [ duplicate_required_var(var_factory, old_to_new_var, v) for v in block.reformulated_vars ] new_in_vars: List[Var] = [duplicate_required_var(var_factory, old_to_new_var, v) for v in block.in_vars] new_out_vars: List[Var] = [duplicate_required_var(var_factory, old_to_new_var, v) for v in block.out_vars] # Clone equations after all variables are known, otherwise expression-only # variables could accidentally diverge from the block variable containers. new_state_eqs: List[Expr] = [ duplicate_expr(var_factory, old_to_new_const, old_to_new_var, e) for e in block.state_eqs ] new_algebraic_eqs: List[Expr] = [ duplicate_expr(var_factory, old_to_new_const, old_to_new_var, e) for e in block.algebraic_eqs ] new_differential_eqs: List[Expr] = [ duplicate_expr(var_factory, old_to_new_const, old_to_new_var, e) for e in block.differential_eqs ] new_init_eqs: Dict[Var, Expr] = { duplicate_required_var(var_factory, old_to_new_var, k): duplicate_expr( var_factory, old_to_new_const, old_to_new_var, v, ) for k, v in block.init_eqs.items() } new_diff_init_eqs: Dict[Var, Expr] = { duplicate_required_var(var_factory, old_to_new_var, k): duplicate_expr( var_factory, old_to_new_const, old_to_new_var, v, ) for k, v in block.diff_init_eqs.items() } new_discrete_eqs: Dict[Var, Expr] = { duplicate_required_var(var_factory, old_to_new_var, k): duplicate_expr( var_factory, old_to_new_const, old_to_new_var, v, ) for k, v in block.discrete_eqs.items() } new_event_dict: Dict[Var, Expr] = { duplicate_required_var(var_factory, old_to_new_var, k): duplicate_expr( var_factory, old_to_new_const, old_to_new_var, v, ) for k, v in block.event_dict.items() } new_mode_dict: Dict[Var, Expr] = { duplicate_required_var(var_factory, old_to_new_var, k): duplicate_expr( var_factory, old_to_new_const, old_to_new_var, v, ) for k, v in block.mode_dict.items() } new_parameters: Dict[Var, Const] = { duplicate_required_var(var_factory, old_to_new_var, k): duplicate_const(var_factory, old_to_new_const, v) for k, v in block.parameters.items() } new_init_values: Dict[Var, Const] = { duplicate_required_var(var_factory, old_to_new_var, k): duplicate_const(var_factory, old_to_new_const, v) for k, v in block.init_values.items() } new_external_mapping: Dict[VarPowerFlowReferenceType, Var | None] = { k: duplicate_var(var_factory, old_to_new_var, v) for k, v in block.external_mapping.items() } new_api_obj_mapping: Dict[ParamPowerFlowReferenceType, Var | None] = { k: duplicate_var(var_factory, old_to_new_var, v) for k, v in block.api_obj_mapping.items() } new_children: List[Block] = [ _duplicate_block(child, var_factory, old_to_new_var, old_to_new_const) for child in block.children ] if block.procedural_logic: from VeraGridEngine.Utils.procedural_logic import clone_procedural_logic_entries new_procedural_logic: List[Any] = clone_procedural_logic_entries( entries=block.procedural_logic, var_mapping=_build_var_mapping(old_vars_by_uid, old_to_new_var), ) else: new_procedural_logic = list() new_block = Block( state_vars=new_state_vars, state_eqs=new_state_eqs, algebraic_vars=new_algebraic_vars, algebraic_eqs=new_algebraic_eqs, diff_vars=new_diff_vars, reformulated_vars=new_reformulated_vars, differential_eqs=new_differential_eqs, parameters=new_parameters, init_values=new_init_values, init_eqs=new_init_eqs, diff_init_eqs=new_diff_init_eqs, discrete_eqs=new_discrete_eqs, children=new_children, in_vars=new_in_vars, out_vars=new_out_vars, event_dict=new_event_dict, mode_dict=new_mode_dict, procedural_logic=new_procedural_logic, external_mapping=new_external_mapping, api_obj_mapping=new_api_obj_mapping, name=block.name, ) extra_key: str extra_value: Any for extra_key, extra_value in block.__dict__.items(): if extra_key in new_block.__dict__: pass else: setattr(new_block, extra_key, copy.deepcopy(extra_value)) return new_block
[docs] def duplicate_block(block: Block, var_factory: VarFactory | None) -> Block: """ Create a duplicate of this block with new variable UIDs. The new block contains variables with the same names but different UIDs, and equations rebuilt using those new variables. :param block: Source block. :param var_factory: Variable factory used to allocate cloned variables and constants. :return: A new Block with duplicated variables and equations. """ if var_factory is None: raise TypeError("duplicate_block: var_factory cannot be None") else: return _duplicate_block( block=block, var_factory=var_factory, old_to_new_var=dict(), old_to_new_const=dict(), )
[docs] def compare_blocks(block1: Block, block2: Block, var_factory1: VarFactory, var_factory2: VarFactory, testing=False): """ Create depp copy of a block :param block1: :param block2: :param var_factory: :param testing: :return: """ # TODO: do not use dictionaries and compare directly using the block information saver1 = BlockSaver(var_factory1) d1 = saver1.save_block(block1, main=False) const_save1 = saver1.get_const_to_save() vars_save1 = saver1.get_vars_to_save() diff_vars_save1 = saver1.get_diff_vars_to_save() blocks1 = saver1.get_blocks() saver2 = BlockSaver(var_factory2) d2 = saver2.save_block(block2, main=False) const_save2 = saver2.get_const_to_save() vars_save2 = saver2.get_vars_to_save() diff_vars_save2 = saver2.get_diff_vars_to_save() blocks2 = saver2.get_blocks() return (blocks1, const_save1, vars_save1, diff_vars_save1) == (blocks2, const_save2, vars_save2, diff_vars_save2)