# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from typing import Dict, List, Sequence, Callable, Any
from collections import defaultdict
import numpy as np
import numba as nb
import scipy.sparse as sp
import math
import concurrent.futures
from VeraGridEngine.Utils.Symbolic.symbolic import (Var, Const, Expr, heaviside_num, expression2numba,
get_expression_vars)
from VeraGridEngine.basic_structures import Vec, IntVec
[docs]
def compile_jac_block(block_id,
block_eqs,
block_offsets,
used_vars_count,
compiler_names_dict,
alias_names_dict,
namespace,
VARS_NAME: str,
DIFF_NAME: str,
EVENT_PARAMS_NAME: str,
PARAMS_NAME: str,
use_jit):
"""
function that builds the compiled functions
:param block_id:
:param block_eqs:
:param block_offsets:
:param used_vars_count:
:param compiler_names_dict:
:param alias_names_dict:
:param namespace:
:param VARS_NAME:
:param DIFF_NAME:
:param EVENT_PARAMS_NAME:
:param PARAMS_NAME:
:param use_jit:
:return:
"""
fname = f"jac_block_{block_id}"
src = f"def {fname}({VARS_NAME}, {DIFF_NAME}, {EVENT_PARAMS_NAME}, {PARAMS_NAME}, h, out):\n"
final_names_dict = {}
for uid, count in used_vars_count.items():
if count > 1:
final_names_dict[uid] = alias_names_dict[uid]
src += f" {alias_names_dict[uid]} = {compiler_names_dict[uid]}\n"
else:
final_names_dict[uid] = compiler_names_dict[uid]
offset = block_offsets[block_id]
for i, e in enumerate(block_eqs):
src += f" out[{offset + i}] = {expression2numba(e.simplify(), final_names_dict)}\n"
exec(src, namespace)
if use_jit:
func = nb.njit(
nb.void(nb.float64[:], nb.float64[:], nb.float64[:], nb.float64[:], nb.float64, nb.float64[:]),
fastmath=True,
cache=False
)(namespace[fname])
else:
func = namespace[fname]
return block_id, func
[docs]
class SymbolicJacobian:
"""
Class to store and evaluate a symbolic jacobian
"""
__slots__ = (
"nvar",
"J",
"namespace",
"block_offsets",
"funcs",
)
def __init__(self,
eqs: list,
variables: list,
compiler_names_dict: dict,
alias_names_dict: dict,
VARS_NAME: str,
DIFF_NAME: str,
EVENT_PARAMS_NAME: str,
PARAMS_NAME: str,
dt=None,
use_jit=True,
batch_size=500,
n_jobs=4,
static=False): # número de threads para compilación paralela
"""
JIT‑compile a sparse Jacobian evaluator for *equations* w.r.t *variables*.
:param eqs: Array of equations
:param variables: Array of variables to differentiate against
:param uid2sym_vars: dictionary relating the uid of a var with its array name (i.e. var[0])
:param uid2sym_params:
:param use_jit: if true, the function is compiled by numba
:return:
jac_fn : callable(values: np.ndarray, params: np.ndarray) -> scipy.sparse.csc_matrix
Fast evaluator in which *values* is a 1‑D NumPy vector of length
``len(variables)``and *params* is a 1‑D NumPy vector of length
``len(parameters)``
sparsity_pattern : tuple(np.ndarray, np.ndarray)
Row/col indices of structurally non‑zero entries.
"""
h = Var('h')
alias_names_dict[h.uid] = 'h'
compiler_names_dict[h.uid] = 'h'
if static:
h = None
# --- Build triplets and count variable usage ---------------------------------
used_vars_count = defaultdict(int)
triplets = []
for row, eq in enumerate(eqs):
for col, var in enumerate(variables):
d_expression = eq.diff(var, dt=h)
if not (isinstance(d_expression, Const) and d_expression.value == 0):
triplets.append((col, row, d_expression))
for v in get_expression_vars(d_expression):
used_vars_count[v.uid] += 1
triplets.sort(key=lambda t: (t[0], t[1]))
cols_sorted, rows_sorted, equations_sorted = zip(*triplets) if triplets else ([], [], [])
# --- Assemble sparse CSC Jacobian structure ---------------------------------
nnz = len(cols_sorted)
indices = np.fromiter(rows_sorted, dtype=np.int32, count=nnz)
indptr = np.zeros(len(variables) + 1, dtype=np.int32)
for c in cols_sorted:
indptr[c + 1] += 1
np.cumsum(indptr, out=indptr)
self.nvar = len(variables)
neq = len(eqs)
self.J = sp.csc_matrix((np.zeros(nnz), indices, indptr), shape=(neq, self.nvar))
# --- Batch compilation ------------------------------------------------------
self.namespace: Dict[str, Any] = {
"math": math,
"np": np,
"nb": nb,
"_heaviside": heaviside_num,
}
blocks = [
equations_sorted[i:i + batch_size]
for i in range(0, len(equations_sorted), batch_size)
]
self.block_offsets = [i for i in range(0, len(equations_sorted), batch_size)]
self.funcs: List[Callable[[Vec, Vec, Vec, Vec, float, Vec], None] | None] = [None] * len(blocks)
# --- Compile blocks sequentially -------------------------------------------
# --- Compile blocks in parallel ---------------------------------------------
with concurrent.futures.ThreadPoolExecutor(max_workers=n_jobs) as executor:
futures = [
executor.submit(
compile_jac_block,
i, block, self.block_offsets, used_vars_count,
compiler_names_dict, alias_names_dict,
self.namespace, VARS_NAME, DIFF_NAME, EVENT_PARAMS_NAME, PARAMS_NAME, use_jit
)
for i, block in enumerate(blocks)
]
for f in concurrent.futures.as_completed(futures):
block_id, func = f.result()
self.funcs[block_id] = func
def __call__(self,
x: Vec,
dx: Vec,
variable_params: Vec,
constant_params: Vec,
h: float):
"""
:param x:
:param dx:
:param variable_params:
:param constant_params:
:param h:
:return:
"""
for func in self.funcs:
if func is not None:
func(x, dx, variable_params, constant_params, h, self.J.data)
return self.J
[docs]
class SymbolicVector:
"""
SymbolicFunction
"""
__slots__ = (
"func",
"namespace",
"data",
)
def __init__(self,
eqs: List[Expr],
compiler_names_dict: Dict[int, str],
alias_names_dict: Dict[int, str],
VARS_NAME: str,
DIFF_NAME: str,
EVENT_PARAMS_NAME: str,
PARAMS_NAME: str,
use_jit: bool = True):
"""
Compile the array of expressions to a function that returns an array of values for those expressions
:param eqs: Iterable of expressions (Expr)
:param uid2sym_vars: dictionary relating the uid of a var with its array name (i.e. var[0])
:param uid2sym_params:
:return: Function pointer that returns an array
"""
self.func: Callable[[Vec, Vec, Vec, Vec, Vec], None] | None = None
used_vars_count = defaultdict(int)
for eq in eqs:
for var in get_expression_vars(eq):
used_vars_count[var.uid] += 1
if var.uid not in compiler_names_dict.keys():
print()
if var.uid not in alias_names_dict.keys():
print()
if len(eqs):
self.namespace: Dict[str, Any] = {
"math": math,
"np": np,
"nb": nb,
"_heaviside": heaviside_num,
}
fname = f"func" # since we compile into a namespace, there won't be repeated names
# Build source
src = f"def {fname}({VARS_NAME}, {DIFF_NAME}, {EVENT_PARAMS_NAME}, {PARAMS_NAME}, out):\n"
final_names_dict = compiler_names_dict.copy()
for uid, count in used_vars_count.items():
if count > 1:
final_names_dict[uid] = alias_names_dict[uid]
src += f" {alias_names_dict[uid]} = {compiler_names_dict[uid]}\n"
src += "\n".join(
[f" out[{i}] = {expression2numba(e.simplify(), final_names_dict)}" for i, e in
enumerate(eqs)]) + "\n"
# compile in python namespace
exec(src, self.namespace)
# compile in numba
if use_jit:
self.func = nb.njit(
nb.void(nb.float64[:], nb.float64[:], nb.float64[:], nb.float64[:], nb.float64[:]),
fastmath=True)(self.namespace[fname])
else:
self.func = self.namespace[fname]
else:
self.func = None
self.data: Vec = np.zeros(len(eqs))
def __call__(self, values: Vec, diff_values: Vec, event_params: Vec, params: Vec) -> Vec:
"""
Call the compiled function
:param values:
:param diff_values:
:param event_params:
:param params:
:return:
"""
if self.func is not None:
self.func(values, diff_values, event_params, params, self.data)
return self.data
[docs]
def get_compiled_functions_cache_stats() -> Dict[str, float]:
"""
Return cache statistics for compiled symbolic helper functions.
The legacy compiled-function module does not yet expose a persistent cache,
so the reported cache entry count is zero. The function exists so EMT build
reports can depend on a stable interface while the cache work remains staged.
:return: Cache statistics.
:rtype: Dict[str, float]
"""
return dict(entry_count=0.0)
[docs]
class SymbolicParamsVector:
"""
SymbolicFunction
"""
__slots__ = (
"func",
"namespace",
"data",
)
def __init__(self, eqs: Sequence[Expr],
compiler_names_dict: Dict[int, str],
alias_names_dict: Dict[int, str],
EVENT_PARAMS_NAME: str,
TIME_NAME: str,
use_jit: bool = True):
"""
Compile the array of expressions to a function that returns an array of values for those expressions
:param eqs: Iterable of expressions (Expr)
:param uid2sym_t: dictionary relating the uid of a var with its array name (i.e. var[0])
:return: Function pointer that returns an array
"""
self.func: Callable[[Vec, float, Vec], None] | None = None
used_vars_count = defaultdict(int)
for eq in eqs:
for var in get_expression_vars(eq):
used_vars_count[var.uid] += 1
if len(eqs):
self.namespace: Dict[str, Any] = {
"math": math,
"np": np,
"nb": nb,
"_heaviside": heaviside_num,
}
fname = f"func" # since we compile into a namespace, there won't be repeated names
# Build source
src = f"def {fname}({EVENT_PARAMS_NAME}, {TIME_NAME}, out):\n"
final_names_dict = compiler_names_dict.copy()
for uid, count in used_vars_count.items():
if count > 1:
final_names_dict[uid] = alias_names_dict[uid]
src += f" {alias_names_dict[uid]} = {compiler_names_dict[uid]}\n"
src += "\n".join(
[f" out[{i}] = {expression2numba(e, compiler_names_dict)}" for i, e in enumerate(eqs)]) + "\n"
# compile in python namespace
exec(src, self.namespace)
# compile in numba
if use_jit:
self.func = nb.njit(nb.void(nb.float64[:], nb.float64, nb.float64[:]),
fastmath=True)(self.namespace[fname])
else:
self.func = self.namespace[fname]
else:
self.func = None
self.data = np.zeros(len(eqs))
def __call__(self, event_params: Vec, glob_time: float) -> Vec:
"""
Call the compiled function
:param glob_time:
:return:
"""
if self.func is not None:
self.func(event_params, glob_time, self.data)
return self.data
[docs]
class SymbolicDerivative:
"""
SymbolicFunction
"""
__slots__ = (
"func",
"data",
"index_map",
"namespace",
)
def __init__(self,
vars: Sequence[Var],
uid2idx_vars: Dict[int, int],
diff_vars: Sequence[Var],
compiler_names_dict: Dict[int, str],
use_jit: bool = True):
self.func: Callable[[Vec, Vec, Vec, float, Vec], Vec] | None = None
self.data: Vec = np.zeros(len(diff_vars))
self.index_map: IntVec = np.zeros(len(diff_vars), dtype=np.int32)
self.namespace: Dict[str, Any] = {
"math": math,
"np": np,
"nb": nb,
"_heaviside": heaviside_num,
}
fname = f"func" # since we compile into a namespace, there won't be repeated names
# Build source
uid2dx_expression = {}
# TODO: order diff vars by order
src = f"def {fname}(vars, lagvars, lagdx, h, out):\n"
for i, diff_var in enumerate(diff_vars):
base_var_uid = diff_var.base_var.uid
uid = diff_var.uid
if diff_var.diff_order == 1:
j = uid2idx_vars[base_var_uid]
rhs = f" (vars[{j}] - lagvars[{j}]) / h\n"
else:
dx_expression = uid2dx_expression[diff_var.base_var.uid]
rhs = f" ({dx_expression}- lagdx[{i}]) / h\n"
to_src = f" out[{i}] = {rhs}\n"
uid2dx_expression[uid] = rhs
src += to_src
src += f" return out"
exec(src, self.namespace)
# compile in python namespace
# compile in numba
if use_jit:
self.func = nb.njit(nb.float64[:](nb.float64[:], nb.float64[:], nb.float64[:], nb.float64, nb.float64[:]),
fastmath=True)(self.namespace[fname])
else:
self.func = self.namespace[fname]
def __call__(self, x: Vec, xn: Vec, dx: Vec, h: float) -> Vec:
"""
Call the compiled function
:param x:
:param xn:
:param dx:
:param h:
:return:
"""
if self.func is not None:
self.func(x, xn, dx, h, self.data)
return self.data
[docs]
class SymbolicParamsVectorInit:
"""
SymbolicFunction
"""
__slots__ = (
"func",
"namespace",
"data",
)
def __init__(self, eqs: Sequence[Expr],
compiler_names_dict: Dict[int, str],
alias_names_dict: Dict[int, str],
VARS_NAME: str,
EVENT_PARAMS_NAME: str,
TIME_NAME: str,
use_jit: bool = True,
):
"""
Compile the array of expressions to a function that returns an array of values for those expressions
:param eqs: Iterable of expressions (Expr)
:param uid2sym_t: dictionary relating the uid of a var with its array name (i.e. var[0])
:return: Function pointer that returns an array
"""
self.func: Callable[[Vec, Vec, float, Vec], None] | None = None
used_vars_count = defaultdict(int)
for eq in eqs:
for var in get_expression_vars(eq):
used_vars_count[var.uid] += 1
if len(eqs):
self.namespace: Dict[str, Any] = {
"math": math,
"np": np,
"nb": nb,
"_heaviside": heaviside_num,
}
fname = f"func" # since we compile into a namespace, there won't be repeated names
# Build source
src = f"def {fname}({VARS_NAME}, {EVENT_PARAMS_NAME}, {TIME_NAME}, out):\n"
final_names_dict = compiler_names_dict.copy()
for uid, count in used_vars_count.items():
if count > 1:
final_names_dict[uid] = alias_names_dict[uid]
src += f" {alias_names_dict[uid]} = {compiler_names_dict[uid]}\n"
src += "\n".join(
[f" out[{i}] = {expression2numba(e, compiler_names_dict)}" for i, e in enumerate(eqs)]) + "\n"
# compile in python namespace
exec(src, self.namespace)
# compile in numba
if use_jit:
self.func = nb.njit(nb.void(nb.float64[:], nb.float64[:], nb.float64, nb.float64[:]),
fastmath=True)(self.namespace[fname])
else:
self.func = self.namespace[fname]
else:
self.func = None
self.data = np.zeros(len(eqs))
def __call__(self, variables, event_params: Vec, glob_time: float) -> Vec:
"""
Call the compiled function
:param glob_time:
:return:
"""
if self.func is not None:
self.func(variables, event_params, glob_time, self.data)
return self.data