#!/usr/bin/env python
# coding=utf-8
"""This module regroups all the implemented temporal schemes.
They are written as callable class which take the model and some control
arguments at the init, and perform a computation step every time they are
called.
The following solvers are implemented:
* Backward and Forward Euler, Crank-Nicolson method (with the Theta class)
* Some Rosenbrock Wanner schemes (up to the 6th order) with time controler
* All the scipy.integrate.ode integrators with the scipy_ode class.
"""
import warnings
from functools import partial, wraps
import numpy as np
import scipy.sparse as sps
from loguru import logger
from scipy.integrate import solve_ivp
from scipy.interpolate import interp1d
from scipy.linalg import norm
from toolz import memoize
with warnings.catch_warnings():
warnings.filterwarnings("ignore", module="fuzz")
from fuzzywuzzy import process
available_temporal_schemes = {}
[docs]def get_temporal_scheme(name):
"""get a temporal_scheme by its name
Arguments:
name {str} -- temporal_scheme name
Raises:
NotImplementedError -- raised if the temporal_scheme is not available.
Returns:
TemporalScheme -- the requested temporal_scheme
"""
try:
return available_temporal_schemes[name.lower()]
except KeyError:
err_msg = "%s temporal_scheme is not registered." % name
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
(suggest, score), = process.extract(
name, available_temporal_schemes.keys(), limit=1
)
if score > 70:
err_msg += (
"\n%s is available and seems to be close. "
"It may be what you are looking for !" % suggest
)
err_msg += "\nFull list of available temporal_schemes:\n\t- %s" % (
"\n\t- ".join(available_temporal_schemes.keys())
)
raise NotImplementedError(err_msg)
[docs]def register_temporal_scheme(CustomTemporalScheme):
global available_temporal_schemes
if TemporalScheme not in CustomTemporalScheme.__mro__:
raise AttributeError(
"The provider temporal_scheme should inherit from the "
"TemporalScheme base class."
)
available_temporal_schemes[CustomTemporalScheme.name.lower()] = CustomTemporalScheme
return CustomTemporalScheme
[docs]def null_hook(t, fields):
return fields
[docs]def add_time_stepping(scheme, tol=1e-2, ord=2, m=20, reject_factor=2):
internal_dt = None
def one_step(t, fields, dt, hook):
dt_ = dt
while True:
_, fields_ = scheme(t, fields, m * dt_, hook)
for _ in range(m):
t, fields = scheme(t, fields, dt_, hook)
errs = [
np.linalg.norm(
np.atleast_1d(fields[key]) - np.atleast_1d(fields_[key]), ord
)
/ (m ** 2 - 1)
for key in fields.data_vars
]
err = max(errs)
dt_ = np.sqrt(dt ** 2 * tol / err)
if dt_ < dt / reject_factor:
continue
break
return t, fields, dt_
@wraps(scheme)
def adaptatif_scheme(t, fields, dt, hook=null_hook):
nonlocal internal_dt
next_step = t + dt
internal_dt = internal_dt if internal_dt else dt
while t + internal_dt <= next_step:
t, fields, internal_dt = one_step(t, fields, internal_dt / m, hook)
if t < next_step:
t, fields = scheme(t, fields, next_step - t, hook)
return t, fields
return adaptatif_scheme
[docs]class TemporalScheme:
pass
[docs]class ROW_general(TemporalScheme):
"""Rosenbrock Wanner class of temporal solvers
The implementation and the different parameters can be found in
http://www.digibib.tu-bs.de/?docid=00055262
"""
@memoize
def __cache__(self, N):
Id = sps.eye(N, format="csc")
return Id
def __init__(
self,
model,
alpha,
gamma,
b,
b_pred=None,
time_stepping=False,
tol=None,
max_iter=None,
dt_min=None,
safety_factor=0.9,
solver="auto",
recompute_target=True,
iteratif_atol=1e-3,
initial_dt=None,
):
self._internal_dt = 1e-6 if initial_dt is None else initial_dt
self._model = model
self._alpha = alpha
self._gamma = gamma
self._b = b
self._b_pred = b_pred
self._s = len(b)
self._time_control = time_stepping
self._internal_iter = None
self._tol = tol
self._safety_factor = safety_factor
self._max_iter = max_iter
self._dt_min = dt_min
self._recompute_target = recompute_target
self._solver = solver
self._interp_cache = None
self._atol = iteratif_atol
def __call__(self, t, fields, dt, hook=null_hook):
"""Perform a step of the solver: took a time and a system state as a
skfdiff Fields container and return the next time step with updated
container.
Parameters
----------
t : float
actual time step
fields : skfdiff.Fields
actual system state in a skfdiff Fields
dt : float
temporal step-size
pars : dict
physical parameters of the model
hook : callable, optional
any callable taking the actual time, fields and parameters and
return modified fields and parameters. Will be called every
internal time step and can be used to include time dependent or
conditionnal parameters, boundary conditions...
container
Returns
-------
tuple : t, fields
updated time and fields container
Raises
------
NotImplementedError
raised if a time stepping is requested but the scheme do not
provide the b predictor coefficients.
ValueError
raised if time_stepping is True and tol is not provided.
""" # noqa
if self._time_control:
return self._variable_step(t, fields, dt, hook=hook)
t, fields, _ = self._fixed_step(t, fields, dt, hook=hook)
fields = hook(t, fields)
return t, fields
def _fixed_step(self, t, fields, dt, hook=null_hook):
fields = fields.copy()
fields = hook(t, fields)
J = self._model.J(fields, t)
if self._solver == "auto":
self._solver = "iteratif" if J.size > 1e4 else "direct"
U = self._model.U_from_fields(fields)
Id = self.__cache__(U.size)
self._A = A = Id - self._gamma[0, 0] * dt * J
if self._solver == "direct":
luf = sps.linalg.factorized(A)
dtF = (
self._model.F(hook(t + dt, fields), t + dt) - self._model.F(fields, t)
) / dt
ks = []
for i in np.arange(self._s):
alpha_i = sum([self._alpha[i, j] for j in range(i)])
gamma_i = sum([self._gamma[i, j] for j in range(i)])
t_i = t + alpha_i * dt
fields_i = hook(
t_i,
self._model.fields_from_U(
U + dt * sum([self._alpha[i, j] * ks[j] for j in range(i)]), fields
),
)
F = self._model.F(fields_i, t_i)
B = (
F
+ dt
* (
J @ sum([self._gamma[i, j] * ks[j] for j in range(i)])
if i > 0
else 0
)
+ dt * gamma_i * dtF
)
if self._solver == "direct":
Utilde = luf(B)
elif self._solver == "iteratif":
Utilde = sps.linalg.gcrotmk(
A, B, U if not ks else ks[-1], atol=self._atol
)[0]
else:
Utilde = self._solver(A, B)
ks.append(Utilde)
U = U + dt * sum([bi * ki for bi, ki in zip(self._b, ks)])
U_pred = (
U + dt * sum([bi * ki for bi, ki in zip(self._b_pred, ks)])
if self._b_pred is not None
else None
)
fields = hook(t + dt, self._model.fields_from_U(U, fields))
err = norm(U - U_pred, np.inf) if U_pred is not None else None
return t + dt, fields, err
def _iter_until_reaching_tol(self, fields, t, dt, hook):
while self._err is None or self._err > self._tol:
new_t, new_fields, self._err = self._fixed_step(t, fields, dt, hook)
logger.debug("error: {}".format(self._err))
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
computed_dt = self._safety_factor * dt * np.sqrt(self._tol / self._err)
if np.isinf(computed_dt):
computed_dt = 1 / self._safety_factor * dt
dt = self._internal_dt = computed_dt
return new_t, dt, new_fields
def _variable_step(self, t, fields, dt, hook=null_hook):
self._next_time_step = t + dt
self._max_dt = t
self._internal_iter = 0
try:
fields = self._model.fields_from_U(
self._interp_cache(self._next_time_step), fields
)
return self._next_time_step, fields
except (TypeError, ValueError):
pass
if not self._recompute_target:
dt = self._internal_dt
else:
dt = self._internal_dt = min(self._internal_dt, dt)
while True:
self._err = None
new_t, dt, new_fields = self._iter_until_reaching_tol(fields, t, dt, hook)
logger.debug("dt computed after err below tol: %g" % dt)
logger.debug("ROS_vart, t %g" % t)
if new_t >= self._next_time_step:
target_dt = self._next_time_step - t
logger.debug(
"new t more than next expected time step, "
"compute with proper timestep %g" % target_dt
)
if self._recompute_target:
t, fields, self._err = self._fixed_step(
t, fields, self._next_time_step - t, hook
)
else:
self._interp_cache = interp1d(
[t, new_t],
[
self._model.U_from_fields(fields)[None],
self._model.U_from_fields(new_fields)[None],
],
axis=0,
)
fields = self._model.fields_from_U(
self._interp_cache(self._next_time_step), fields
)
self._internal_iter += 1
fields = hook(t, fields)
return self._next_time_step, fields
t = new_t
fields = new_fields.copy()
self._internal_iter += 1
if self._internal_iter > (
self._max_iter if self._max_iter else self._internal_iter + 1
):
raise RuntimeError(
"Rosenbrock internal iteration " "above max iterations authorized"
)
if dt < (self._dt_min if self._dt_min else dt * 0.5):
raise RuntimeError(
"Rosenbrock internal time step " "less than authorized"
)
[docs]@register_temporal_scheme
class ROS2(ROW_general):
"""Second order Rosenbrock scheme, without time stepping
Parameters
----------
model : skfdiff.Model
skfdiff Model
"""
name = "ROS2"
embeded_timestepping = False
def __init__(self, model):
gamma = np.array(
[[2.928932188134e-1, 0], [-5.857864376269e-1, 2.928932188134e-1]]
)
alpha = np.array([[0, 0], [1, 0]])
b = np.array([1 / 2, 1 / 2])
super().__init__(model, alpha, gamma, b, time_stepping=False)
[docs]@register_temporal_scheme
class ROS3PRw(ROW_general):
"""Third order Rosenbrock scheme, with time stepping
Parameters
----------
model : skfdiff.Model
skfdiff Model
tol : float, optional, default 1E-2
tolerance factor for the time stepping. The time step will adapt to
ensure that the maximum relative error on all fields stay under that
value.
time_stepping : bool, optional, default True
allow a variable internal time-step to ensure good agreement between
computing performance and accuracy.
max_iter : float or None, optional, default None
maximum internal iteration allowed
dt_min : float or None, optional, default None
minimum internal time step allowed
recompute_target : bool, optional, default False
if True a new computation is done when the target time is exceeded,
interpolation used otherwise.
""" # noqa
name = "ROS3PRw"
embeded_timestepping = True
def __init__(
self,
model,
tol=1e-1,
time_stepping=True,
max_iter=None,
dt_min=None,
safety_factor=0.9,
solver="auto",
recompute_target=True,
iteratif_atol=1e-3,
):
alpha = np.zeros((3, 3))
gamma = np.zeros((3, 3))
gamma_i = 7.8867513459481287e-01
b = [5.0544867840851759e-01, -1.1571687603637559e-01, 6.1026819762785800e-01]
b_pred = [
2.8973180237214197e-01,
1.0000000000000001e-01,
6.1026819762785800e-01,
]
alpha[1, 0] = 2.3660254037844388e00
alpha[2, 0] = 5.0000000000000000e-01
alpha[2, 1] = 7.6794919243112270e-01
gamma[0, 0] = gamma[1, 1] = gamma[2, 2] = gamma_i
gamma[1, 0] = -2.3660254037844388e00
gamma[2, 0] = -8.6791218280355165e-01
gamma[2, 1] = -8.7306695894642317e-01
super().__init__(
model,
alpha,
gamma,
b,
tol=tol,
b_pred=b_pred,
time_stepping=time_stepping,
max_iter=max_iter,
dt_min=dt_min,
safety_factor=safety_factor,
solver=solver,
recompute_target=recompute_target,
iteratif_atol=iteratif_atol,
)
[docs]@register_temporal_scheme
class ROS3PRL(ROW_general):
"""4th order Rosenbrock scheme, with time stepping
Parameters
----------
model : skfdiff.Model
skfdiff Model
tol : float, optional, default 1E-2
tolerance factor for the time stepping. The time step will adapt to
ensure that the maximum relative error on all fields stay under that
value.
time_stepping : bool, optional, default True
allow a variable internal time-step to ensure good agreement between
computing performance and accuracy.
max_iter : float or None, optional, default None
maximum internal iteration allowed
dt_min : float or None, optional, default None
minimum internal time step allowed
recompute_target : bool, optional, default False
if True a new computation is done when the target time is exceeded,
interpolation used otherwise.
""" # noqa
name = "ROS3PRL"
embeded_timestepping = True
def __init__(
self,
model,
tol=1e-1,
time_stepping=True,
max_iter=None,
dt_min=None,
safety_factor=0.9,
solver="auto",
recompute_target=True,
iteratif_atol=1e-3,
):
alpha = np.zeros((4, 4))
gamma = np.zeros((4, 4))
gamma_i = 4.3586652150845900e-01
b = [
2.1103008548132443e-03,
8.8607515441580453e-01,
-3.2405197677907682e-01,
4.3586652150845900e-01,
]
b_pred = [
5.0000000000000000e-01,
3.8752422953298199e-01,
-2.0949226315045236e-01,
3.2196803361747034e-01,
]
alpha[1, 0] = 0.5
alpha[2, 0] = 0.5
alpha[2, 1] = 0.5
alpha[3, 0] = 0.5
alpha[3, 1] = 0.5
alpha[3, 2] = 0
for i in range(len(b)):
gamma[i, i] = gamma_i
gamma[1, 0] = -5.0000000000000000e-01
gamma[2, 0] = -7.9156480420464204e-01
gamma[2, 1] = 3.5244216792751432e-01
gamma[3, 0] = -4.9788969914518677e-01
gamma[3, 1] = 3.8607515441580453e-01
gamma[3, 2] = -3.2405197677907682e-01
super().__init__(
model,
alpha,
gamma,
b,
b_pred=b_pred,
tol=tol,
time_stepping=time_stepping,
max_iter=max_iter,
dt_min=dt_min,
safety_factor=safety_factor,
solver=solver,
recompute_target=recompute_target,
iteratif_atol=iteratif_atol,
)
[docs]@register_temporal_scheme
class RODASPR(ROW_general):
"""6th order Rosenbrock scheme, with time stepping
Parameters
----------
model : skfdiff.Model
skfdiff Model
tol : float, optional, default 1E-2
tolerance factor for the time stepping. The time step will adapt to
ensure that the maximum relative error on all fields stay under that
value.
time_stepping : bool, optional, default True
allow a variable internal time-step to ensure good agreement between
computing performance and accuracy.
max_iter : float or None, optional, default None
maximum internal iteration allowed
dt_min : float or None, optional, default None
minimum internal time step allowed
recompute_target : bool, optional, default False
if True a new computation is done when the target time is exceeded,
interpolation used otherwise.
""" # noqa
name = "RODASPR"
embeded_timestepping = True
def __init__(
self,
model,
tol=1e-1,
time_stepping=True,
max_iter=None,
dt_min=None,
safety_factor=0.9,
solver="auto",
recompute_target=True,
iteratif_atol=1e-3,
):
alpha = np.zeros((6, 6))
gamma = np.zeros((6, 6))
b = [
-7.9683251690137014e-1,
6.2136401428192344e-2,
1.1198553514719862e00,
4.7198362114404874e-1,
-1.0714285714285714e-1,
2.5e-1,
]
b_pred = [
-7.3844531665375115e0,
-3.0593419030174646e-1,
7.8622074209377981e0,
5.7817993590145966e-1,
2.5e-1,
0,
]
alpha[1, 0] = 7.5e-1
alpha[2, 0] = 7.5162877593868457e-2
alpha[2, 1] = 2.4837122406131545e-2
alpha[3, 0] = 1.6532708886396510e0
alpha[3, 1] = 2.1545706385445562e-1
alpha[3, 2] = -1.3157488872766792e0
alpha[4, 0] = 1.9385003738039885e1
alpha[4, 1] = 1.2007117225835324e0
alpha[4, 2] = -1.9337924059522791e1
alpha[4, 3] = -2.4779140110062559e-1
alpha[5, 0] = -7.3844531665375115e0
alpha[5, 1] = -3.0593419030174646e-1
alpha[5, 2] = 7.8622074209377981e0
alpha[5, 3] = 5.7817993590145966e-1
alpha[5, 4] = 2.5e-1
gamma_i = 0.25
for i in range(len(b)):
gamma[i, i] = gamma_i
gamma[1, 0] = -7.5e-1
gamma[2, 0] = -8.8644e-2
gamma[2, 1] = -2.868897e-2
gamma[3, 0] = -4.84700e0
gamma[3, 1] = -3.1583e-1
gamma[3, 2] = 4.9536568e0
gamma[4, 0] = -2.67694569e1
gamma[4, 1] = -1.5066459e0
gamma[4, 2] = 2.720013e1
gamma[4, 3] = 8.25971337e-1
gamma[5, 0] = 6.58762e0
gamma[5, 1] = 3.6807059e-1
gamma[5, 2] = -6.74235e0
gamma[5, 3] = -1.061963e-1
gamma[5, 4] = -3.57142857e-1
super().__init__(
model,
alpha,
gamma,
b,
b_pred=b_pred,
tol=tol,
time_stepping=time_stepping,
max_iter=max_iter,
dt_min=dt_min,
safety_factor=safety_factor,
solver=solver,
recompute_target=recompute_target,
iteratif_atol=iteratif_atol,
)
[docs]@register_temporal_scheme
class scipy_ivp(TemporalScheme):
"""Proxy written around the scipy.integrate.solve_ivp function.
Give access to all
the scipy integrators.
Parameters
----------
model : skfdiff.Model
skfdiff Model
method : str, optional, default 'RK45'
name of the chosen scipy integration scheme.
**integrator_kwargs
extra arguments provided to the scipy integration scheme.
""" # noqa
name = "scipy_ivp"
embeded_timestepping = True
def __init__(self, model, use_jac=True, method="RK45", **integrator_kwargs):
self._model = model
self._method = method
try:
model.J
except AttributeError:
if method not in ["RK45", "RK23"]:
warnings.warn(
"Jacobian computation routine not available. "
"Scipy will try to estimate it, it can be really expensive. "
"It's advised to switch to an explicite method, or to change "
"the backend."
)
use_jac = False
self._use_jac = use_jac if method not in ["RK45", "RK23"] else False
self._integrator_kwargs = integrator_kwargs
def func_scipy_proxy(t, U, fields, hook):
fields = self._model.fields_from_U(U, fields)
fields = hook(t, fields)
return self._model.F(fields, t)
def jacob_scipy_proxy(t, U, fields, hook):
fields = self._model.fields_from_U(U, fields)
fields = hook(t, fields)
return self._model.J(fields, t)
self.func = func_scipy_proxy
self.jac = jacob_scipy_proxy
def __call__(self, t, fields, dt, hook=null_hook):
"""Perform a step of the solver: took a time and a system state as a
skfdiff Fields container and return the next time step with updated
container.
Parameters
----------
t : float
actual time step
fields : skfdiff.Fields
actual system state in a skfdiff Fields
dt : float
temporal step-size
pars : dict
physical parameters of the model
hook : callable, optional
any callable taking the actual time, fields and parameters and
return modified fields and parameters. Will be called every
internal time step and can be used to include time dependent or
conditionnal parameters, boundary conditions...
container
Returns
-------
tuple : t, fields
updated time and fields container
Raises
------
RuntimeError
Description
""" # noqa
fields = hook(t, fields)
U = self._model.U_from_fields(fields)
if self._use_jac:
self._integrator_kwargs["jac"] = partial(self.jac, fields=fields, hook=hook)
results = solve_ivp(
fun=partial(self.func, fields=fields, hook=hook),
t_span=[t, t + dt],
t_eval=[t + dt],
y0=U,
method=self._method,
**self._integrator_kwargs
)
fields = self._model.fields_from_U(results.y, fields)
fields = hook(t + dt, fields)
return t + dt, fields
[docs]@register_temporal_scheme
class Theta(TemporalScheme):
"""Simple theta-based scheme where theta is a weight
if theta = 0, the scheme is a forward-euler scheme
if theta = 1, the scheme is a backward-euler scheme
if theta = 0.5, the scheme is called a Crank-Nicolson scheme
Parameters
----------
model : skfdiff.Model
skfdiff Model
theta : int, optional, default 1
weight of the theta-scheme
solver : callable, optional, default scipy.sparse.linalg.spsolve
method able to solve a Ax = b linear equation with A a sparse matrix.
Take A and b as argument and return x.
""" # noqa
name = "theta"
embeded_timestepping = False
def __init__(self, model, theta=1, solver=sps.linalg.spsolve):
self._model = model
self._theta = theta
self._solver = solver
def __call__(self, t, fields, dt, hook=null_hook):
"""Perform a step of the solver: took a time and a system state as a
skfdiff Fields container and return the next time step with updated
container.
Parameters
----------
t : float
actual time step
fields : skfdiff.Fields
actual system state in a skfdiff Fields container
dt : float
temporal step-size
pars : dict
physical parameters of the model
hook : callable, optional
any callable taking the actual time, fields and parameters and
return modified fields and parameters. Will be called every
internaltime step and can be used to include time dependent or
conditionnal parameters, boundary conditions...
Returns
-------
tuple : t, fields
updated time and fields container
""" # noqa
fields = fields.copy()
fields = hook(t, fields)
U = self._model.U_from_fields(fields)
F = self._model.F(fields, t)
if self._theta != 0:
J = self._model.J(fields, t)
B = dt * (F - self._theta * J @ U) + U
J = sps.identity(U.size, format="csc") - self._theta * dt * J
new_U = self._solver(J, B)
else:
new_U = dt * F + U
if isinstance(new_U, tuple):
new_U = new_U[0]
fields = self._model.fields_from_U(new_U, fields)
fields = hook(t + dt, fields)
return t + dt, fields
[docs]class Stationnary:
"""Solve the stationnary problem F(U) = 0
Parameters
----------
model : skfdiff.Model
skfdiff Model
solver : callable, optional, default scipy.sparse.linalg.spsolve
method able to solve a Ax = b linear equation with A a sparse matrix.
Take A and b as argument and return x.
""" # noqa
def __init__(self, model, solver=sps.linalg.spsolve):
self._model = model
self._solver = solver
def __call__(self, fields):
"""Perform a step of the solver: took a time and a system state as a
skfdiff Fields container and return the next time step with updated
container.
Parameters
----------
t : float
actual time step
fields : skfdiff.Fields
actual system state in a skfdiff Fields container
dt : float
temporal step-size
pars : dict
physical parameters of the model
hook : callable, optional
any callable taking the actual time, fields and parameters and
return modified fields and parameters. Will be called every
internaltime step and can be used to include time dependent or
conditionnal parameters, boundary conditions...
Returns
-------
tuple : t, fields
updated time and fields container
""" # noqa
fields = fields.copy()
U0 = self._model.U_from_fields(fields)
J0 = self._model.J(fields)
F0 = self._model.F(fields)
new_U = self._solver(J0, J0 @ U0 - F0)
if isinstance(new_U, tuple):
new_U = new_U[0]
fields = self._model.fields_from_U(new_U, fields)
return fields