Source code for skfdiff.core.temporal_schemes

#!/usr/bin/env python
# coding=utf-8
"""This module regroups all the implemented temporal schemes.
They are written as callable class which take the model and some control
arguments at the init, and perform a computation step every time they are
called.

The following solvers are implemented:
    * Backward and Forward Euler, Crank-Nicolson method (with the Theta class)
    * Some Rosenbrock Wanner schemes (up to the 6th order) with time controler
    * All the scipy.integrate.ode integrators with the scipy_ode class.
"""

import warnings
from functools import partial, wraps

import numpy as np
import scipy.sparse as sps
from loguru import logger
from scipy.integrate import solve_ivp
from scipy.interpolate import interp1d
from scipy.linalg import norm
from toolz import memoize

with warnings.catch_warnings():
    warnings.filterwarnings("ignore", module="fuzz")
    from fuzzywuzzy import process

available_temporal_schemes = {}


[docs]def get_temporal_scheme(name): """get a temporal_scheme by its name Arguments: name {str} -- temporal_scheme name Raises: NotImplementedError -- raised if the temporal_scheme is not available. Returns: TemporalScheme -- the requested temporal_scheme """ try: return available_temporal_schemes[name.lower()] except KeyError: err_msg = "%s temporal_scheme is not registered." % name with warnings.catch_warnings(): warnings.filterwarnings("ignore") (suggest, score), = process.extract( name, available_temporal_schemes.keys(), limit=1 ) if score > 70: err_msg += ( "\n%s is available and seems to be close. " "It may be what you are looking for !" % suggest ) err_msg += "\nFull list of available temporal_schemes:\n\t- %s" % ( "\n\t- ".join(available_temporal_schemes.keys()) ) raise NotImplementedError(err_msg)
[docs]def register_temporal_scheme(CustomTemporalScheme): global available_temporal_schemes if TemporalScheme not in CustomTemporalScheme.__mro__: raise AttributeError( "The provider temporal_scheme should inherit from the " "TemporalScheme base class." ) available_temporal_schemes[CustomTemporalScheme.name.lower()] = CustomTemporalScheme return CustomTemporalScheme
[docs]def null_hook(t, fields): return fields
[docs]def add_time_stepping(scheme, tol=1e-2, ord=2, m=20, reject_factor=2): internal_dt = None def one_step(t, fields, dt, hook): dt_ = dt while True: _, fields_ = scheme(t, fields, m * dt_, hook) for _ in range(m): t, fields = scheme(t, fields, dt_, hook) errs = [ np.linalg.norm( np.atleast_1d(fields[key]) - np.atleast_1d(fields_[key]), ord ) / (m ** 2 - 1) for key in fields.data_vars ] err = max(errs) dt_ = np.sqrt(dt ** 2 * tol / err) if dt_ < dt / reject_factor: continue break return t, fields, dt_ @wraps(scheme) def adaptatif_scheme(t, fields, dt, hook=null_hook): nonlocal internal_dt next_step = t + dt internal_dt = internal_dt if internal_dt else dt while t + internal_dt <= next_step: t, fields, internal_dt = one_step(t, fields, internal_dt / m, hook) if t < next_step: t, fields = scheme(t, fields, next_step - t, hook) return t, fields return adaptatif_scheme
[docs]class TemporalScheme: pass
[docs]class ROW_general(TemporalScheme): """Rosenbrock Wanner class of temporal solvers The implementation and the different parameters can be found in http://www.digibib.tu-bs.de/?docid=00055262 """ @memoize def __cache__(self, N): Id = sps.eye(N, format="csc") return Id def __init__( self, model, alpha, gamma, b, b_pred=None, time_stepping=False, tol=None, max_iter=None, dt_min=None, safety_factor=0.9, solver="auto", recompute_target=True, iteratif_atol=1e-3, initial_dt=None, ): self._internal_dt = 1e-6 if initial_dt is None else initial_dt self._model = model self._alpha = alpha self._gamma = gamma self._b = b self._b_pred = b_pred self._s = len(b) self._time_control = time_stepping self._internal_iter = None self._tol = tol self._safety_factor = safety_factor self._max_iter = max_iter self._dt_min = dt_min self._recompute_target = recompute_target self._solver = solver self._interp_cache = None self._atol = iteratif_atol def __call__(self, t, fields, dt, hook=null_hook): """Perform a step of the solver: took a time and a system state as a skfdiff Fields container and return the next time step with updated container. Parameters ---------- t : float actual time step fields : skfdiff.Fields actual system state in a skfdiff Fields dt : float temporal step-size pars : dict physical parameters of the model hook : callable, optional any callable taking the actual time, fields and parameters and return modified fields and parameters. Will be called every internal time step and can be used to include time dependent or conditionnal parameters, boundary conditions... container Returns ------- tuple : t, fields updated time and fields container Raises ------ NotImplementedError raised if a time stepping is requested but the scheme do not provide the b predictor coefficients. ValueError raised if time_stepping is True and tol is not provided. """ # noqa if self._time_control: return self._variable_step(t, fields, dt, hook=hook) t, fields, _ = self._fixed_step(t, fields, dt, hook=hook) fields = hook(t, fields) return t, fields def _fixed_step(self, t, fields, dt, hook=null_hook): fields = fields.copy() fields = hook(t, fields) J = self._model.J(fields, t) if self._solver == "auto": self._solver = "iteratif" if J.size > 1e4 else "direct" U = self._model.U_from_fields(fields) Id = self.__cache__(U.size) self._A = A = Id - self._gamma[0, 0] * dt * J if self._solver == "direct": luf = sps.linalg.factorized(A) dtF = ( self._model.F(hook(t + dt, fields), t + dt) - self._model.F(fields, t) ) / dt ks = [] for i in np.arange(self._s): alpha_i = sum([self._alpha[i, j] for j in range(i)]) gamma_i = sum([self._gamma[i, j] for j in range(i)]) t_i = t + alpha_i * dt fields_i = hook( t_i, self._model.fields_from_U( U + dt * sum([self._alpha[i, j] * ks[j] for j in range(i)]), fields ), ) F = self._model.F(fields_i, t_i) B = ( F + dt * ( J @ sum([self._gamma[i, j] * ks[j] for j in range(i)]) if i > 0 else 0 ) + dt * gamma_i * dtF ) if self._solver == "direct": Utilde = luf(B) elif self._solver == "iteratif": Utilde = sps.linalg.gcrotmk( A, B, U if not ks else ks[-1], atol=self._atol )[0] else: Utilde = self._solver(A, B) ks.append(Utilde) U = U + dt * sum([bi * ki for bi, ki in zip(self._b, ks)]) U_pred = ( U + dt * sum([bi * ki for bi, ki in zip(self._b_pred, ks)]) if self._b_pred is not None else None ) fields = hook(t + dt, self._model.fields_from_U(U, fields)) err = norm(U - U_pred, np.inf) if U_pred is not None else None return t + dt, fields, err def _iter_until_reaching_tol(self, fields, t, dt, hook): while self._err is None or self._err > self._tol: new_t, new_fields, self._err = self._fixed_step(t, fields, dt, hook) logger.debug("error: {}".format(self._err)) with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) computed_dt = self._safety_factor * dt * np.sqrt(self._tol / self._err) if np.isinf(computed_dt): computed_dt = 1 / self._safety_factor * dt dt = self._internal_dt = computed_dt return new_t, dt, new_fields def _variable_step(self, t, fields, dt, hook=null_hook): self._next_time_step = t + dt self._max_dt = t self._internal_iter = 0 try: fields = self._model.fields_from_U( self._interp_cache(self._next_time_step), fields ) return self._next_time_step, fields except (TypeError, ValueError): pass if not self._recompute_target: dt = self._internal_dt else: dt = self._internal_dt = min(self._internal_dt, dt) while True: self._err = None new_t, dt, new_fields = self._iter_until_reaching_tol(fields, t, dt, hook) logger.debug("dt computed after err below tol: %g" % dt) logger.debug("ROS_vart, t %g" % t) if new_t >= self._next_time_step: target_dt = self._next_time_step - t logger.debug( "new t more than next expected time step, " "compute with proper timestep %g" % target_dt ) if self._recompute_target: t, fields, self._err = self._fixed_step( t, fields, self._next_time_step - t, hook ) else: self._interp_cache = interp1d( [t, new_t], [ self._model.U_from_fields(fields)[None], self._model.U_from_fields(new_fields)[None], ], axis=0, ) fields = self._model.fields_from_U( self._interp_cache(self._next_time_step), fields ) self._internal_iter += 1 fields = hook(t, fields) return self._next_time_step, fields t = new_t fields = new_fields.copy() self._internal_iter += 1 if self._internal_iter > ( self._max_iter if self._max_iter else self._internal_iter + 1 ): raise RuntimeError( "Rosenbrock internal iteration " "above max iterations authorized" ) if dt < (self._dt_min if self._dt_min else dt * 0.5): raise RuntimeError( "Rosenbrock internal time step " "less than authorized" )
[docs]@register_temporal_scheme class ROS2(ROW_general): """Second order Rosenbrock scheme, without time stepping Parameters ---------- model : skfdiff.Model skfdiff Model """ name = "ROS2" embeded_timestepping = False def __init__(self, model): gamma = np.array( [[2.928932188134e-1, 0], [-5.857864376269e-1, 2.928932188134e-1]] ) alpha = np.array([[0, 0], [1, 0]]) b = np.array([1 / 2, 1 / 2]) super().__init__(model, alpha, gamma, b, time_stepping=False)
[docs]@register_temporal_scheme class ROS3PRw(ROW_general): """Third order Rosenbrock scheme, with time stepping Parameters ---------- model : skfdiff.Model skfdiff Model tol : float, optional, default 1E-2 tolerance factor for the time stepping. The time step will adapt to ensure that the maximum relative error on all fields stay under that value. time_stepping : bool, optional, default True allow a variable internal time-step to ensure good agreement between computing performance and accuracy. max_iter : float or None, optional, default None maximum internal iteration allowed dt_min : float or None, optional, default None minimum internal time step allowed recompute_target : bool, optional, default False if True a new computation is done when the target time is exceeded, interpolation used otherwise. """ # noqa name = "ROS3PRw" embeded_timestepping = True def __init__( self, model, tol=1e-1, time_stepping=True, max_iter=None, dt_min=None, safety_factor=0.9, solver="auto", recompute_target=True, iteratif_atol=1e-3, ): alpha = np.zeros((3, 3)) gamma = np.zeros((3, 3)) gamma_i = 7.8867513459481287e-01 b = [5.0544867840851759e-01, -1.1571687603637559e-01, 6.1026819762785800e-01] b_pred = [ 2.8973180237214197e-01, 1.0000000000000001e-01, 6.1026819762785800e-01, ] alpha[1, 0] = 2.3660254037844388e00 alpha[2, 0] = 5.0000000000000000e-01 alpha[2, 1] = 7.6794919243112270e-01 gamma[0, 0] = gamma[1, 1] = gamma[2, 2] = gamma_i gamma[1, 0] = -2.3660254037844388e00 gamma[2, 0] = -8.6791218280355165e-01 gamma[2, 1] = -8.7306695894642317e-01 super().__init__( model, alpha, gamma, b, tol=tol, b_pred=b_pred, time_stepping=time_stepping, max_iter=max_iter, dt_min=dt_min, safety_factor=safety_factor, solver=solver, recompute_target=recompute_target, iteratif_atol=iteratif_atol, )
[docs]@register_temporal_scheme class ROS3PRL(ROW_general): """4th order Rosenbrock scheme, with time stepping Parameters ---------- model : skfdiff.Model skfdiff Model tol : float, optional, default 1E-2 tolerance factor for the time stepping. The time step will adapt to ensure that the maximum relative error on all fields stay under that value. time_stepping : bool, optional, default True allow a variable internal time-step to ensure good agreement between computing performance and accuracy. max_iter : float or None, optional, default None maximum internal iteration allowed dt_min : float or None, optional, default None minimum internal time step allowed recompute_target : bool, optional, default False if True a new computation is done when the target time is exceeded, interpolation used otherwise. """ # noqa name = "ROS3PRL" embeded_timestepping = True def __init__( self, model, tol=1e-1, time_stepping=True, max_iter=None, dt_min=None, safety_factor=0.9, solver="auto", recompute_target=True, iteratif_atol=1e-3, ): alpha = np.zeros((4, 4)) gamma = np.zeros((4, 4)) gamma_i = 4.3586652150845900e-01 b = [ 2.1103008548132443e-03, 8.8607515441580453e-01, -3.2405197677907682e-01, 4.3586652150845900e-01, ] b_pred = [ 5.0000000000000000e-01, 3.8752422953298199e-01, -2.0949226315045236e-01, 3.2196803361747034e-01, ] alpha[1, 0] = 0.5 alpha[2, 0] = 0.5 alpha[2, 1] = 0.5 alpha[3, 0] = 0.5 alpha[3, 1] = 0.5 alpha[3, 2] = 0 for i in range(len(b)): gamma[i, i] = gamma_i gamma[1, 0] = -5.0000000000000000e-01 gamma[2, 0] = -7.9156480420464204e-01 gamma[2, 1] = 3.5244216792751432e-01 gamma[3, 0] = -4.9788969914518677e-01 gamma[3, 1] = 3.8607515441580453e-01 gamma[3, 2] = -3.2405197677907682e-01 super().__init__( model, alpha, gamma, b, b_pred=b_pred, tol=tol, time_stepping=time_stepping, max_iter=max_iter, dt_min=dt_min, safety_factor=safety_factor, solver=solver, recompute_target=recompute_target, iteratif_atol=iteratif_atol, )
[docs]@register_temporal_scheme class RODASPR(ROW_general): """6th order Rosenbrock scheme, with time stepping Parameters ---------- model : skfdiff.Model skfdiff Model tol : float, optional, default 1E-2 tolerance factor for the time stepping. The time step will adapt to ensure that the maximum relative error on all fields stay under that value. time_stepping : bool, optional, default True allow a variable internal time-step to ensure good agreement between computing performance and accuracy. max_iter : float or None, optional, default None maximum internal iteration allowed dt_min : float or None, optional, default None minimum internal time step allowed recompute_target : bool, optional, default False if True a new computation is done when the target time is exceeded, interpolation used otherwise. """ # noqa name = "RODASPR" embeded_timestepping = True def __init__( self, model, tol=1e-1, time_stepping=True, max_iter=None, dt_min=None, safety_factor=0.9, solver="auto", recompute_target=True, iteratif_atol=1e-3, ): alpha = np.zeros((6, 6)) gamma = np.zeros((6, 6)) b = [ -7.9683251690137014e-1, 6.2136401428192344e-2, 1.1198553514719862e00, 4.7198362114404874e-1, -1.0714285714285714e-1, 2.5e-1, ] b_pred = [ -7.3844531665375115e0, -3.0593419030174646e-1, 7.8622074209377981e0, 5.7817993590145966e-1, 2.5e-1, 0, ] alpha[1, 0] = 7.5e-1 alpha[2, 0] = 7.5162877593868457e-2 alpha[2, 1] = 2.4837122406131545e-2 alpha[3, 0] = 1.6532708886396510e0 alpha[3, 1] = 2.1545706385445562e-1 alpha[3, 2] = -1.3157488872766792e0 alpha[4, 0] = 1.9385003738039885e1 alpha[4, 1] = 1.2007117225835324e0 alpha[4, 2] = -1.9337924059522791e1 alpha[4, 3] = -2.4779140110062559e-1 alpha[5, 0] = -7.3844531665375115e0 alpha[5, 1] = -3.0593419030174646e-1 alpha[5, 2] = 7.8622074209377981e0 alpha[5, 3] = 5.7817993590145966e-1 alpha[5, 4] = 2.5e-1 gamma_i = 0.25 for i in range(len(b)): gamma[i, i] = gamma_i gamma[1, 0] = -7.5e-1 gamma[2, 0] = -8.8644e-2 gamma[2, 1] = -2.868897e-2 gamma[3, 0] = -4.84700e0 gamma[3, 1] = -3.1583e-1 gamma[3, 2] = 4.9536568e0 gamma[4, 0] = -2.67694569e1 gamma[4, 1] = -1.5066459e0 gamma[4, 2] = 2.720013e1 gamma[4, 3] = 8.25971337e-1 gamma[5, 0] = 6.58762e0 gamma[5, 1] = 3.6807059e-1 gamma[5, 2] = -6.74235e0 gamma[5, 3] = -1.061963e-1 gamma[5, 4] = -3.57142857e-1 super().__init__( model, alpha, gamma, b, b_pred=b_pred, tol=tol, time_stepping=time_stepping, max_iter=max_iter, dt_min=dt_min, safety_factor=safety_factor, solver=solver, recompute_target=recompute_target, iteratif_atol=iteratif_atol, )
[docs]@register_temporal_scheme class scipy_ivp(TemporalScheme): """Proxy written around the scipy.integrate.solve_ivp function. Give access to all the scipy integrators. Parameters ---------- model : skfdiff.Model skfdiff Model method : str, optional, default 'RK45' name of the chosen scipy integration scheme. **integrator_kwargs extra arguments provided to the scipy integration scheme. """ # noqa name = "scipy_ivp" embeded_timestepping = True def __init__(self, model, use_jac=True, method="RK45", **integrator_kwargs): self._model = model self._method = method try: model.J except AttributeError: if method not in ["RK45", "RK23"]: warnings.warn( "Jacobian computation routine not available. " "Scipy will try to estimate it, it can be really expensive. " "It's advised to switch to an explicite method, or to change " "the backend." ) use_jac = False self._use_jac = use_jac if method not in ["RK45", "RK23"] else False self._integrator_kwargs = integrator_kwargs def func_scipy_proxy(t, U, fields, hook): fields = self._model.fields_from_U(U, fields) fields = hook(t, fields) return self._model.F(fields, t) def jacob_scipy_proxy(t, U, fields, hook): fields = self._model.fields_from_U(U, fields) fields = hook(t, fields) return self._model.J(fields, t) self.func = func_scipy_proxy self.jac = jacob_scipy_proxy def __call__(self, t, fields, dt, hook=null_hook): """Perform a step of the solver: took a time and a system state as a skfdiff Fields container and return the next time step with updated container. Parameters ---------- t : float actual time step fields : skfdiff.Fields actual system state in a skfdiff Fields dt : float temporal step-size pars : dict physical parameters of the model hook : callable, optional any callable taking the actual time, fields and parameters and return modified fields and parameters. Will be called every internal time step and can be used to include time dependent or conditionnal parameters, boundary conditions... container Returns ------- tuple : t, fields updated time and fields container Raises ------ RuntimeError Description """ # noqa fields = hook(t, fields) U = self._model.U_from_fields(fields) if self._use_jac: self._integrator_kwargs["jac"] = partial(self.jac, fields=fields, hook=hook) results = solve_ivp( fun=partial(self.func, fields=fields, hook=hook), t_span=[t, t + dt], t_eval=[t + dt], y0=U, method=self._method, **self._integrator_kwargs ) fields = self._model.fields_from_U(results.y, fields) fields = hook(t + dt, fields) return t + dt, fields
[docs]@register_temporal_scheme class Theta(TemporalScheme): """Simple theta-based scheme where theta is a weight if theta = 0, the scheme is a forward-euler scheme if theta = 1, the scheme is a backward-euler scheme if theta = 0.5, the scheme is called a Crank-Nicolson scheme Parameters ---------- model : skfdiff.Model skfdiff Model theta : int, optional, default 1 weight of the theta-scheme solver : callable, optional, default scipy.sparse.linalg.spsolve method able to solve a Ax = b linear equation with A a sparse matrix. Take A and b as argument and return x. """ # noqa name = "theta" embeded_timestepping = False def __init__(self, model, theta=1, solver=sps.linalg.spsolve): self._model = model self._theta = theta self._solver = solver def __call__(self, t, fields, dt, hook=null_hook): """Perform a step of the solver: took a time and a system state as a skfdiff Fields container and return the next time step with updated container. Parameters ---------- t : float actual time step fields : skfdiff.Fields actual system state in a skfdiff Fields container dt : float temporal step-size pars : dict physical parameters of the model hook : callable, optional any callable taking the actual time, fields and parameters and return modified fields and parameters. Will be called every internaltime step and can be used to include time dependent or conditionnal parameters, boundary conditions... Returns ------- tuple : t, fields updated time and fields container """ # noqa fields = fields.copy() fields = hook(t, fields) U = self._model.U_from_fields(fields) F = self._model.F(fields, t) if self._theta != 0: J = self._model.J(fields, t) B = dt * (F - self._theta * J @ U) + U J = sps.identity(U.size, format="csc") - self._theta * dt * J new_U = self._solver(J, B) else: new_U = dt * F + U if isinstance(new_U, tuple): new_U = new_U[0] fields = self._model.fields_from_U(new_U, fields) fields = hook(t + dt, fields) return t + dt, fields
[docs]class Stationnary: """Solve the stationnary problem F(U) = 0 Parameters ---------- model : skfdiff.Model skfdiff Model solver : callable, optional, default scipy.sparse.linalg.spsolve method able to solve a Ax = b linear equation with A a sparse matrix. Take A and b as argument and return x. """ # noqa def __init__(self, model, solver=sps.linalg.spsolve): self._model = model self._solver = solver def __call__(self, fields): """Perform a step of the solver: took a time and a system state as a skfdiff Fields container and return the next time step with updated container. Parameters ---------- t : float actual time step fields : skfdiff.Fields actual system state in a skfdiff Fields container dt : float temporal step-size pars : dict physical parameters of the model hook : callable, optional any callable taking the actual time, fields and parameters and return modified fields and parameters. Will be called every internaltime step and can be used to include time dependent or conditionnal parameters, boundary conditions... Returns ------- tuple : t, fields updated time and fields container """ # noqa fields = fields.copy() U0 = self._model.U_from_fields(fields) J0 = self._model.J(fields) F0 = self._model.F(fields) new_U = self._solver(J0, J0 @ U0 - F0) if isinstance(new_U, tuple): new_U = new_U[0] fields = self._model.fields_from_U(new_U, fields) return fields