# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
import numpy as np
import scipy.sparse as sp
from scipy.sparse import csc_matrix
from VeraGridEngine.Simulations.Rms.problems.rms_problem_dae import RmsProblemDae
from VeraGridEngine.Utils.Sparse.csc import pack_4_by_4_scipy
from VeraGridEngine.basic_structures import Vec, Mat
[docs]
class MidpointImplicitIntegration:
def __init__(self,
problem: RmsProblemDae,
t0: float,
t_end: float,
h: float,
max_iter: int,
tolerance: float = 1e-7,
dx0_init: Vec | None = None,
use_fd_jacobian: bool = True):
self.problem = problem
self.t0 = t0
self.h = h
self.max_iter_0 = max_iter
self.steps = int(np.ceil((t_end - t0) / h))
self.t: Vec = np.empty(self.steps + 1)
self.y: Mat = np.empty((self.steps + 1, self.problem.get_all_vars_number()))
self.tol = tolerance
self.dx0_init = dx0_init
self.use_fd_jacobian = use_fd_jacobian
def _rhs_implicit(self,
x_new: Vec,
x_prev: Vec,
dx: Vec,
h: float) -> tuple[Vec, Vec]:
x_mid = 0.5 * (x_new + x_prev)
f_algeb = self.problem.rhs_algebraic(x_mid, dx)
if self.problem.get_states_number() > 0:
f_state = self.problem.rhs_state(x_mid, dx)
f_state_update = x_new[:self.problem.get_states_number()] - x_prev[:self.problem.get_states_number()] - h * f_state
return np.r_[f_state_update, f_algeb], x_mid
return f_algeb, x_mid
def _jacobian_implicit(self,
x_mid: Vec,
dx: Vec,
h: float) -> sp.csc_matrix:
if self.problem.get_states_number() == 0:
j22: sp.csc_matrix = self.problem.get_j22(x_mid, dx, h)
return 0.5 * j22
j11_val: csc_matrix = self.problem.get_j11(x_mid, dx, h)
j12_val: csc_matrix = self.problem.get_j12(x_mid, dx, h)
j21_val: csc_matrix = self.problem.get_j21(x_mid, dx, h)
j22_val: csc_matrix = self.problem.get_j22(x_mid, dx, h)
I = sp.eye(m=self.problem.get_states_number(), n=self.problem.get_states_number())
j11: sp.csc_matrix = (I - 0.5 * h * j11_val).tocsc()
j12: sp.csc_matrix = -0.5 * h * j12_val
j21: sp.csc_matrix = 0.5 * j21_val
j22: sp.csc_matrix = 0.5 * j22_val
return pack_4_by_4_scipy(j11, j12, j21, j22)
def _jacobian_fd(self,
x_new: Vec,
x_prev: Vec,
dx_last: Vec,
h_eff: float,
rhs_base: Vec) -> sp.csc_matrix:
n = x_new.size
m = rhs_base.size
J = np.zeros((m, n), dtype=float)
base_eps = 1e-7
for j in range(n):
x_pert = x_new.copy()
step = base_eps * (1.0 + abs(float(x_new[j])))
x_pert[j] += step
dx_pert = self.problem.get_dx(x_pert, x_prev, dx_last, h_eff)
rhs_pert, _ = self._rhs_implicit(x_pert, x_prev, dx_pert, h_eff)
J[:, j] = (rhs_pert - rhs_base) / step
return sp.csc_matrix(J)
[docs]
def simulate(self):
converged: bool = False
well_initialized: bool = True
x0: Vec = self.problem.get_x0()
if self.dx0_init is None:
dx0: Vec = np.zeros(self.problem.get_diff_var_number(), dtype=float)
else:
dx0 = np.array(self.dx0_init, dtype=float, copy=True)
self.t[0] = self.t0
self.y[0, :] = x0.copy()
dx = dx0.copy()
dx_last = dx0.copy()
has_fmu_cs = True
has_fmu_me = True
try:
self.problem.initialize_fmu_cs_devices(x0, self.t0)
except AttributeError:
has_fmu_cs = False
try:
self.problem.initialize_fmu_me_devices(x0, self.t0)
except AttributeError:
has_fmu_me = False
try:
for step_idx in range(self.steps):
self.problem.report_progress2(step_idx, self.steps)
t_current_macro: float = self.t[step_idx]
t_macro_target: float = t_current_macro + self.h
x_prev = self.y[step_idx, :].copy()
x_new = x_prev.copy()
t_local_prev = t_current_macro
is_first_local_step = True
dx_last = dx
substep_converged = True
while t_local_prev < (t_macro_target - 1e-15):
forced_event_time: float | None = self.problem.get_next_forced_event_time(
t_local_prev,
t_macro_target,
)
if forced_event_time is None:
t_curr = t_macro_target
else:
t_curr = forced_event_time
h_eff: float = t_curr - t_local_prev
if h_eff <= 0.0:
raise RuntimeError(f"Invalid local step size h_eff={h_eff} while integrating RMS macro step {step_idx}.")
try:
self.problem.update_variable_params(t=t_local_prev, x_snapshot=x_new)
except TypeError:
self.problem.update_variable_params(t_local_prev)
self.problem.update(t_curr, x_new, self.problem._variable_parameters_values)
if has_fmu_cs:
self.problem.advance_fmu_cs_devices(t=t_local_prev, x_snapshot=x_prev, h=h_eff)
if has_fmu_me:
self.problem.advance_fmu_me_devices(t=t_local_prev, x_snapshot=x_prev, h=h_eff)
n_iter = 0
substep_converged = False
while not substep_converged and n_iter < self.max_iter_0:
dx = self.problem.get_dx(x_new, x_prev, dx_last, h_eff)
rhs, x_mid = self._rhs_implicit(x_new, x_prev, dx, h_eff)
residual = np.linalg.norm(rhs, np.inf)
substep_converged = residual < self.tol
if step_idx == 0 and is_first_local_step and (not substep_converged):
well_initialized = False
if not substep_converged:
if self.use_fd_jacobian:
Jf = self._jacobian_fd(x_new, x_prev, dx_last, h_eff, rhs)
else:
Jf = self._jacobian_implicit(x_mid, dx, h_eff)
delta = sp.linalg.spsolve(Jf, -rhs)
solved = np.all(np.isfinite(delta))
if not solved:
delta, *_ = sp.linalg.lsqr(Jf, -rhs)
solved = np.all(np.isfinite(delta))
if not solved:
break
accepted = False
alpha = 1.0
base_res = residual
for _ in range(8):
x_trial = x_new + alpha * delta
dx_trial = self.problem.get_dx(x_trial, x_prev, dx_last, h_eff)
rhs_trial, _ = self._rhs_implicit(x_trial, x_prev, dx_trial, h_eff)
trial_res = np.linalg.norm(rhs_trial, np.inf)
if np.isfinite(trial_res) and trial_res < base_res:
x_new = x_trial
accepted = True
break
alpha *= 0.5
if not accepted:
break
n_iter += 1
if substep_converged:
dx_last = dx.copy()
x_prev = x_new.copy()
t_local_prev = t_curr
is_first_local_step = False
else:
converged = False
break
if not substep_converged:
break
self.y[step_idx + 1, :] = x_prev
self.t[step_idx + 1] = t_macro_target
converged = True
finally:
if has_fmu_cs:
self.problem.close_fmu_cs_devices()
if has_fmu_me:
self.problem.close_fmu_me_devices()
return self.t, self.y, well_initialized, converged