# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from typing import Callable, Any, Union, List, Dict, Tuple
from dataclasses import dataclass
import numba as nb
import numpy as np
import pandas as pd
from scipy.sparse import csc_matrix as csc
from scipy import sparse
import timeit
from matplotlib import pyplot as plt
from VeraGridEngine.basic_structures import Vec, CxVec
from VeraGridEngine.Utils.Sparse.csc import pack_3_by_4, diags
from VeraGridEngine.Utils.NumericalMethods.sparse_solve import get_linear_solver
from VeraGridEngine.enumerations import SparseSolver
linear_solver = get_linear_solver(SparseSolver.Pardiso)
[docs]
def step_calculation(v: Vec, dv: Vec, tau: float = 0.99995):
"""
This function calculates for each Lambda multiplier or its associated Slack variable
the maximum allowed step in order to not violate the KKT condition Lambda > 0 and S > 0
:param v: Array of multipliers or slack variables
:param dv: Variation calculated in the Newton step
:param tau: Factor to be not exactly 1
:return: step size value for the given multipliers
"""
k = np.flatnonzero(dv < 0.0)
if len(k) > 0:
alpha = min([tau * min(v[k] / (-dv[k] + 1e-15)), 1])
else:
alpha = 1
return alpha
[docs]
@nb.njit(cache=True)
def split(sol: Vec, n: int):
"""
Split the solution vector in two
:param sol: solution vector
:param n: integer position at whic to split the solution
:return: A before, B after the splitting point
"""
return sol[:n], sol[n:]
[docs]
@nb.njit(cache=True)
def calc_error(dx, dz, dmu, dlmbda):
"""
Calculate the error of the process
:param dx: x increments array
:param dz: z increments array
:param dmu: mu increments array
:param dlmbda: lambda increments array
:return: max abs value of all of the increments
"""
err = 0.0
for arr in [dx, dz, dmu, dlmbda]:
for i in range(len(arr)):
v = abs(arr[i])
if v > err:
err = v
return err
[docs]
@nb.njit(cache=True)
def max_abs(x: Vec):
"""
Compute max abs efficiently
:param x: State vector
:return: Inf-norm of the state vector
"""
max_val = 0.0
for x_val in x:
x_abs = x_val if x_val > 0.0 else -x_val
if x_abs > max_val:
max_val = x_val
return max_val
[docs]
def calc_feascond(g: Vec, h: Vec, x: Vec, z: Vec):
"""
Calculate the feasible conditions
:param g: Equality values
:param h: Inequality values
:param x: State vector
:param z: Vector of z slack variables
:return: Feasibility condition value
"""
return max(max_abs(g), np.max(h)) / (1.0 + max(max_abs(x), max_abs(z)))
[docs]
def calc_gradcond(lx: Vec, lam: Vec, mu: Vec):
"""
calculate the gradient conditions
:param lx: Gradient of the lagrangian
:param lam: Vector of lambda multipliers
:param mu: Vector of mu multipliers
:return: Gradient condition value
"""
return max_abs(lx) / (1 + max(max_abs(lam), max_abs(mu)))
[docs]
def calc_ccond(mu: Vec, z: Vec, x: Vec):
"""
:param mu: Vector of mu mutipliers
:param z: Vector of z slack variables
:param x: State vector
:return: Vector of ccond
"""
return (mu @ z) / (1.0 + max_abs(x))
[docs]
def calc_ocond(f: float, f_prev: float):
"""
:param f: Value of objective funciton
:param f_prev: Previous value of objective function
:return: Variation of the objective function
"""
return abs(f - f_prev) / (1.0 + abs(f_prev))
[docs]
@dataclass(slots=True)
class IpsFunctionReturn:
"""
Represents the returning value of the interior point evaluation
"""
f: float # objective function value
G: Vec # equalities increment vector
H: Vec # inequalities increment vector
fx: Vec # objective function Jacobian Vector
Gx: csc # equalities Jacobian Matrix
Hx: csc # inequalities Jacobian Matrix
fxx: csc # objective function Hessian Matrix
Gxx: csc # equalities Hessian Matrix
Hxx: csc # inequalities Hessian Matrix
# extra data passed through for the results
S: CxVec
Sf: CxVec
St: CxVec
[docs]
def get_data(self) -> List[Union[float, Vec, csc]]:
"""
Returns the structures in a list
:return: List of float, Vec, and csc
"""
return [self.f, self.G, self.H, self.fx, self.Gx, self.Hx, self.fxx, self.Gxx, self.Hxx]
[docs]
def compare(self, other: "IpsFunctionReturn", h: float) -> Dict[str, Union[float, Vec, csc]]:
"""
Returns the comparison between this structure and another structure of this type
:param other: IpsFunctionReturn
:param h: finite differences step
:return: Dictionary with the structure name and the difference
"""
errors = dict()
for i, (analytic_struct, finit_struct, name) in enumerate(zip(self.get_data(),
other.get_data(),
self.get_headers())):
# if isinstance(analytic_struct, np.ndarray):
if sparse.isspmatrix(analytic_struct):
a = analytic_struct.toarray()
b = finit_struct.toarray()
else:
a = analytic_struct
b = finit_struct
ok = np.allclose(a, b, atol=h * 10)
if not ok:
diff = a - b
errors[name] = diff
return errors
[docs]
@dataclass(slots=True)
class IpsSolution:
"""
Represents the returning value of the interior point solution
"""
x: Vec
error: float
gamma: float
lam: Vec
dlam: Vec
mu: Vec
z: Vec
residuals: Vec
structs: IpsFunctionReturn
converged: bool
iterations: int
error_evolution: Vec
[docs]
def plot_error(self):
"""
Plot the IPS error
"""
plt.figure()
plt.plot(self.error_evolution, )
plt.xlabel("Iterations")
plt.ylabel("Error")
plt.yscale('log')
plt.show()
[docs]
def interior_point_solver(x0: Vec,
n_x: int,
n_eq: int,
n_ineq: int,
func: Callable[[Vec, Vec, Vec, bool, bool, Any], IpsFunctionReturn],
arg=(),
max_iter=100,
tol=1e-6,
pf_init=False,
trust=0.9,
verbose: int = 0,
step_control=False) -> Tuple[IpsSolution, Vec]:
"""
Solve a non-linear problem of the form:
min: f(x)
s.t.
G(x) = 0
H(x) <= 0
xmin <= x <= xmax
The problem is specified by a function `f_eval`
This function is called with (x, mu, lmbda) and
returns (f, G, H, fx, Gx, Hx, fxx, Gxx, Hxx)
where:
x: array of variables
lambda: Lagrange Multiplier associated with the inequality constraints
pi: Lagrange Multiplier associated with the equality constraints
f: objective function value (float)
G: Array of equality mismatches (vec)
H: Array of inequality mismatches (vec)
fx: jacobian of f(x) (vec)
Gx: Jacobian of G(x) (CSC mat)
Hx: Jacobian of H(x) (CSC mat)
fxx: Hessian of f(x) (CSC mat)
Gxx: Hessian of G(x) (CSC mat)
Hxx: Hessian of H(x) (CSC mat)
See: On Computational Issues of Market-Based Optimal Power Flow by
Hongye Wang, Carlos E. Murillo-SΓ‘nchez, Ray D. Zimmerman, and Robert J. Thomas
IEEE TRANSACTIONS ON POWER SYSTEMS, VOL. 22, NO. 3, AUGUST 2007
:param x0: Initial solution
:param n_x: Number of variables (size of x)
:param n_eq: Number of equality constraints (rows of H)
:param n_ineq: Number of inequality constraints (rows of G)
:param func: A function pointer called with (x, mu, lmbda, *args) that returns (f, G, H, fx, Gx, Hx, fxx, Gxx, Hxx)
:param arg: Tuple of arguments to call func: func(x, mu, lmbda, *arg)
:param max_iter: Maximum number of iterations
:param tol: Convergence tolerance
:param pf_init: Use the power flow solution as initial values
:param trust: Amount of trust in the initial Newton derivative length estimation
:param verbose: 0 to 3 (the larger, the more verbose)
:param step_control: Use step control to improve the solution process control
:return: IpsSolution
"""
times = np.array([np.zeros(15)])
t_start = timeit.default_timer()
# Init iteration values
error = 1e6
iter_counter = 0
x = x0.copy()
gamma = 1.0
nabla = 0.05
rho_lower = 1.0 - nabla
rho_upper = 1.0 + nabla
e = np.ones(n_ineq)
# Our init, which computes the multipliers as a solution of the KKT conditions
if pf_init:
z0 = 1.0
z = z0 * np.ones(n_ineq)
lam = np.ones(n_eq)
mu = z.copy()
ret = func(x, mu, lam, True, False, *arg)
z = - ret.H
z = np.array([1e-2 if zz < 1e-2 else zz for zz in z])
z_inv = diags(1.0 / z)
mu = gamma * (z_inv @ e)
mu_diag = diags(mu)
lam = sparse.linalg.lsqr(ret.Gx.T, -ret.fx - ret.Hx.T @ mu.T)[0]
# PyPower-like init
else:
ret, _ = func(x, None, None, False, False, *arg)
z0 = 1.0
z = z0 * np.ones(n_ineq)
mu = z0 * np.ones(n_ineq)
lam = np.zeros(n_eq)
kk = np.flatnonzero(ret.H < -z0)
z[kk] = -ret.H[kk]
z_inv = diags(1.0 / z)
kk = np.flatnonzero((gamma / z) > z0)
mu[kk] = gamma / z[kk]
mu_diag = diags(mu)
ret, _ = func(x, mu, lam, True, False, *arg)
feascond = calc_feascond(g=ret.G, h=ret.H, x=x, z=z)
converged = error <= gamma
maxdispl = 0
error_evolution = np.zeros(max_iter + 1)
feascond_evolution = np.zeros(max_iter + 1)
# record initial values
feascond_evolution[iter_counter] = feascond
error_evolution[0] = error
n = np.zeros(n_x + n_eq)
dlam = None
while not converged and iter_counter < max_iter:
ts_iter = timeit.default_timer()
# Evaluate the functions, gradients and hessians at the current iteration.
if iter_counter == 0:
ret, new_times_0 = func(x, mu, lam, True, True, *arg)
else:
new_times_0 = 0
Hx_t = ret.Hx.T
Gx_t = ret.Gx.T
# compose the Jacobian
lxx = ret.fxx + ret.Gxx + ret.Hxx
m = lxx + Hx_t @ z_inv @ mu_diag @ ret.Hx
jac = pack_3_by_4(m.tocsc(), Gx_t.tocsc(), ret.Gx.tocsc())
# compose the residual
lx = ret.fx + Gx_t @ lam + Hx_t @ mu
n = lx + Hx_t @ z_inv @ (gamma * e + mu * ret.H)
r = - np.r_[n, ret.G]
# Find the reduced problem residuals and split them
ts_nrstep = timeit.default_timer()
dx, dlam = split(linear_solver(jac, r), n_x)
te_nrstep = timeit.default_timer()
# Calculate the inequalities residuals using the reduced problem residuals
ts_mult = timeit.default_timer()
dz = - ret.H - z - ret.Hx @ dx
dmu = - mu + z_inv @ (gamma * e - mu * dz)
te_mult = timeit.default_timer()
# Step control as in PyPower
if step_control:
l0 = ret.f + np.dot(lam, ret.G) + np.dot(mu, ret.H + z) - gamma * np.sum(np.log(z))
alpha = trust
for j in range(20):
dx1 = alpha * dx
x1 = x + dx1
ret1 = func(x1, mu, lam, False, False, *arg)
l1 = ret1.f + lam.T @ ret1.G + mu.T @ (ret1.H + z) - gamma * np.sum(np.log(z))
rho = (l1 - l0) / (lx @ dx1 + 0.5 * dx1.T @ lxx @ dx1)
if rho_lower < rho < rho_upper:
break
else:
alpha = alpha / 2.0
print('Use step control!')
dx = alpha * dx
dz = alpha * dz
dlam = alpha * dlam
dmu = alpha * dmu
# Compute the maximum step allowed
ts_steps = timeit.default_timer()
alpha_p = step_calculation(z, dz)
alpha_d = step_calculation(mu, dmu)
te_steps = timeit.default_timer()
# Update the values of the variables and multipliers
x += dx * alpha_p
z += dz * alpha_p
lam += dlam * alpha_d
mu += dmu * alpha_d
gamma = 0.1 * mu @ z / n_ineq
# Update fobj, g, h, calculate next step.
ret, new_times_i = func(x, mu, lam, True, True, *arg)
ts_conds = timeit.default_timer()
Hx_t = ret.Hx.T
Gx_t = ret.Gx.T
g_norm = np.linalg.norm(ret.G, np.inf)
lam_norm = np.linalg.norm(lam, np.inf)
mu_norm = np.linalg.norm(mu, np.inf)
z_norm = np.linalg.norm(z, np.inf)
lx = ret.fx + Hx_t @ mu + Gx_t @ lam
feascond = max([g_norm, max(ret.H)]) / (1 + max([np.linalg.norm(x, np.inf), z_norm]))
gradcond = np.linalg.norm(lx, np.inf) / (1 + max([lam_norm, mu_norm]))
error = np.max([feascond, gradcond, gamma])
maxdispl = np.max(np.r_[dx, dlam, dz, dmu])
z_inv = diags(1.0 / z)
mu_diag = diags(mu)
te_conds = timeit.default_timer()
converged = feascond < tol and gradcond < tol and gamma < tol
if verbose > 1:
print(f'Iteration: {iter_counter}', "-" * 80)
if verbose > 2:
x_df = pd.DataFrame(data={'x': x, 'dx': dx})
eq_df = pd.DataFrame(data={'Ξ»': lam, 'dΞ»': dlam})
ineq_df = pd.DataFrame(data={'mu': mu, 'z': z, 'dmu': dmu, 'dz': dz})
print("x:\n", x_df)
print("EQ:\n", eq_df)
print("INEQ:\n", ineq_df)
print("\tGamma:", gamma)
print("\tErr:", error)
print("\tMax Displacement:", maxdispl)
# Add an iteration step
iter_counter += 1
# record evolution
feascond_evolution[iter_counter] = feascond
error_evolution[iter_counter] = error
te_iter = timeit.default_timer()
new_times = np.r_[new_times_i+new_times_0, te_nrstep - ts_nrstep, te_mult - ts_mult, te_steps - ts_steps, te_conds - ts_conds, te_iter - ts_iter]
times_i = times.copy()
times = np.r_[times_i, [new_times]]
t_end = timeit.default_timer()
if verbose > 0:
print(f'SOLUTION', "-" * 80)
print(f"\tx:", x)
print(f"\tΞ»:", lam)
print(f"\tF.obj: {ret.f * 1e4}")
print(f"\tErr: {error}")
print(f'\tIterations: {iter_counter}')
print(f'\tMax Displacement: {maxdispl}' )
print(f'\tTime elapsed (s): {t_end - t_start}')
print(f'\tFeas cond: ', feascond)
return IpsSolution(x=x, error=error, gamma=gamma, lam=lam, dlam=dlam, mu=mu, z=z, residuals=n, structs=ret,
converged=converged, iterations=iter_counter, error_evolution=error_evolution), times