skfdiff package

scikit-fdiff model

class skfdiff.model.Model(evolution_equations, unknowns, parameters=None, coordinates=None, boundary_conditions=None, subs=None, custom_functions=None, backend='numpy', backend_kwargs=None)[source]

Bases: object

Contain finite difference approximation and routine of the dynamical system.

Take a mathematical form as input, use Sympy to transform it as a symbolic expression, perform the finite difference approximation and expose theano optimized routine for both the right hand side of the dynamical system and Jacobian matrix approximation.

Parameters:
  • evolution_equations (Union[Sequence[str], str]) – the right hand sides of the partial differential equations written as \(\\frac{\\partial U}{\\partial t} = F(U)\), where the spatial derivative can be written as dxxU or dx(U, 2) or with the sympy notation Derivative(U, x, x)

  • unknowns (Union[Sequence[str], str]) – the dependent variables with the same order as the temporal derivative of the previous arg.

  • parameters (Optional[Union[Sequence[str], str]]) – list of the parameters. Can be feed with a scalar of an array with the same size. They can be derivated in space as well.

  • boundary_conditions (Optional[Union[str, Dict[str, Dict[str, Tuple[str, str]]]]]) – Can be either “noflux” (default behavior), “periodic”, or a dictionary that follow this structure : {dependent_var: {indep_var: (left_boundary, right_boundary)}}, the boundaries being written as residual (rhs - lhs = 0). For example, an hybrid Dirichlet / Neumann flux will be written as {“U”: {“x”: (dxU - 2, U - 3)}}. If a boundary is None, a nul flux will be applied.

  • subs (Optional[Dict[str, str]]) – Substitution dictionnary, useful to write complex systems. The key of this dictionnary will be substitued everywhere in the evolution equations as well as in boundary conditions.

  • backend (str, default to "numpy") – A registered backend that will take the discretized system and expose the evolution equations routine F as well as an optional Jacobian routine J. If the later is not provided by the backend, the implicit methods will not be available. TODO: make an efficient jacobian approximation for that case, given the bandwidth. For now, there is the numpy backend, that provide efficient (but not optimal) computation with minimal dependencies, and numba backend that provide a LLVM based routine that allow faster parallel computation at the costs of a (sometime long) warm-up. The numba one is less tested and stable, but provide a potential huge speed-up for explicit methods.

  • backend_kwargs (Optional[Dict] : supplementary arguments provided to the backend.) –

Examples

A simple diffusion equation:

>>> from skfdiff import Model
>>> model = Model("k * dxxU", "U", "k")

A coupled system of convection-diffusion equation:

>>> from skfdiff import Model
>>> model = Model(["k1 * dxxU - c1 * dxV",
...                "k2 * dxxV - c2 * dxU",],
...                ["U", "V"], ["k1", "k2", "c1", "c2"])

A 2D pure advection model, without / with upwind scheme.

>>> from skfdiff import Model
>>> model = Model("c_x * dxU + c_y * dyU", "U(x, y)",
...               ["c_x", "c_y"])
>>> model = Model("upwind(c_x, U, x, 2) + upwind(c_y, U, y, 2)",
...               "U(x, y)",
...               ["c_x", "c_y"])

A 2D diffusion model, with hybrid boundary conditions (Dirichlet, Neuman, Robin and No-flux).

>>> from skfdiff import Model
>>> model = Model("dxxU + dyyU", "U(x, y)",
...               boundary_conditions={("U", "x"): ("U - 3", "dxU + 5"),
...                                    ("U", "y"): ("dyU - (U - 3)", None)
...                                   })
F(fields, t=0)[source]

Compute the right hand side of the dynamical system

property Fields

Base for structured fields stored in a xarray.

J(fields, t=0)[source]

Compute the Jacobian of the dynamical system as sparce csc matrix

U_from_fields(fields, t=0)[source]

Get a fields and return the data as a single flat vector.

fields_from_U(U, fields, t=0)[source]

take a solution ad single flat vector and return structured fields.

fields_template(*args, **kwargs)[source]
init_simulation(fields, dt, t=0, tmax=None, id=None, hook=<function null_hook>, scheme='RODASPR', time_stepping=True, **kwargs)[source]

Init a skfdiff Simulation. See skfdiff.simulation.Simulation docstring for further details.

skfdiff.model.create_fields(coords, unks, pars)[source]

scikit-fdiff simulation

class skfdiff.simulation.PostProcess(name, function, description)

Bases: tuple

property description

Alias for field number 2

property function

Alias for field number 1

property name

Alias for field number 0

class skfdiff.simulation.Simulation(model, fields, dt, t=0, tmax=None, id=None, hook=<function null_hook>, scheme='RODASPR', time_stepping=True, **kwargs)[source]

Bases: object

High level container used to run simulation build on skfdiff Model. This object is an iterable which will yield every time step until the parameters ‘tmax’ is reached if provided. By default, the solver use a 6th order ROW solver, an implicit method with integrated time-stepping.

Parameters:
  • model (skfdiff.Model) – Contain finite difference approximation and routine of the dynamical system

  • fields (skfdiff.BaseFields or dict (any mappable)) – skfdiff container or mappable filled with initial conditions

  • dt (float) – time stepping for output. if time_stepping is False, the internal time stepping will be the same.

  • t (float, optional, default 0.) – initial time

  • tmax (float, optional, default None) – Control the end of the simulation. If None (the default), the com- putation will continue until interrupted by the user (using Ctrl-C or a SIGTERM signal).

  • id (None, optional) – Name of the simulation. A 2 word slug will be generated if not provided.

  • hook (callable, optional, default null_hook.) – Any callable taking the actual time, fields and parameters and return modified fields and parameters. Will be called every internal time step and can be used to include time dependent or conditionnal parameters, boundary conditions… The default null_hook has no impact on the computation.

  • scheme (callable, optional, default skfdiff.schemes.RODASPR) – An callable object which take the simulation state and return the next step. Its signature is scheme.__call__(fields, t, dt, hook) and it should return the next time and the updated fields. It take the model and extra positional and named arguments.

  • time_stepping (boolean, default True) – Indicate if the time step is controlled by an algorithm dependant of the temporal scheme (see the doc on time stepping for extra info).

  • **kwargs – extra arguments passed to the scheme.

dt

output time step

Type:

float

fields

skfdiff container filled with actual data

Type:

skfdiff.Fields

i

actual iteration

Type:

int

id

name of the simulation

Type:

str

model

skfdiff Model used in the simulation

Type:

skfdiff.Model

status

status of the simulation, one of the following one: (‘created’, ‘running’, ‘finished’, ‘failed’)

Type:

str

t

actual time

Type:

float

tmax

stopping time of the simulation. Not stopping if set to None.

Type:

float or None, default None

Examples

>>> import numpy as np
>>> import skfdiff
>>> model = skfdiff.Model(["k1 * dxxU",
...                        "k2 * dxxV"],
...                       ["U", "V"],
...                       ["k1", "k2"])
>>> x = np.linspace(0, 100, 1000, endpoint=False)
>>> U = np.cos(x * 2 * np.pi / 100)
>>> V = np.sin(x * 2 * np.pi / 100)
>>> fields = model.Fields(x=x, U=U, V=V)
>>> simulation = skfdiff.Simulation(model, fields, dt=5., tmax=50.)
>>> for t, fields in simulation:
...    pass
>>> print(t)
50.0
add_post_process(name, post_process, description='')[source]

add a post-process

Parameters:
  • name (str) – name of the post-traitment

  • post_process (callback (function of a class with a __call__ method) – or a streamz.Stream). this callback have to accept the simulation state as parameter and return the modifield simulation state. if a streamz.Stream is provided, it will me plugged_in with the previous streamz (and ultimately to the initial_stream). All these stream accept and return the simulation state.

  • description (str, optional, Default is "".) – give extra information about the post-processing

attach_container(path=None, save='all', mode='w', nbuffer=None, save_interval=None, force=False, backend='netcdf')[source]

add a Container to the simulation which allows some persistance to the simulation.

Parameters:
  • path (str or None (default: None)) – path for the container. If None (the default), the data lives only in memory (and are available with simulation.container)

  • mode (str, optional) – “a” or “w” (default “w”)

  • save (str, optional) – “all” will save every time-step, “last” will only get the last time step

  • nbuffer (int, optional) – wait until nbuffer data in the Queue before saving on disk.

  • save_interval (int, optional) – wait until save_interval since last flush before saving on disk.

  • force (bool, optional (default False)) – if True, remove the target folder if not empty. if False, raise an error.

compute()[source]

Generator which yield the actual state of the system every dt.

Yields:

tuple (t, fields) – Actual time and updated fields container.

property container

give access to the attached container, if any.

Type:

skfdiff.Container

property container_data

give access to the attached data in the attached container, if any.

Type:

skfdiff.Container

display(keys='unknowns', n=1, dim_allowed=(0, 1, 2))[source]
display_custom(plot_function, n=1)[source]
property plugins_timer

the cpu time of the plugins (post process, data save, display…).

Type:

skfdiff.core.simulation.Timer

property post_processes

list of skfdiff.core.simulation.PostProcess: contain all the post processing function attached to the simulation.

remove_post_process(name)[source]

remove a post-process

Parameters:

name (str) – name of the post-process to remove.

run(progress=True, verbose=False)[source]

Compute all steps of the simulation. Be careful: if tmax is not set, this function will result in an infinit loop.

Returns:

last time and result fields.

Return type:

(t, fields)

property stream

Streamz starting point, fed by the simulation state after each time_step. This interface is used for post-processing, saving the data on disk by the Container and display the fields in real-time.

Type:

streamz.Stream

property timer

the cpu time of the previous step and the total running time of the simulation.

Type:

skfdiff.core.simulation.Timer

class skfdiff.simulation.Timer(last, total)[source]

Bases: object

skfdiff.simulation.get_initial(total_iter, i)[source]
skfdiff.simulation.get_total_iter(tmax, user_dt)[source]
skfdiff.simulation.now(tz=None)

Returns new datetime object representing current time local to tz.

tz

Timezone object.

If no tz is specified, uses local timezone.

skfdiff.simulation.null_hook(t, fields)[source]
skfdiff.simulation.postprocess_save_timer(simul)[source]

scikit-fdiff temporal schemes

This module regroups all the implemented temporal schemes. They are written as callable class which take the model and some control arguments at the init, and perform a computation step every time they are called.

The following solvers are implemented:
  • Backward and Forward Euler, Crank-Nicolson method (with the Theta class)

  • Some Rosenbrock Wanner schemes (up to the 6th order) with time controler

  • All the scipy.integrate.ode integrators with the scipy_ode class.

class skfdiff.core.temporal_schemes.RODASPR(model, tol=0.1, time_stepping=True, max_iter=None, dt_min=None, safety_factor=0.9, solver='auto', recompute_target=True, iteratif_atol=0.001)[source]

Bases: ROW_general

6th order Rosenbrock scheme, with time stepping

Parameters:
  • model (skfdiff.Model) – skfdiff Model

  • tol (float, optional, default 1E-2) – tolerance factor for the time stepping. The time step will adapt to ensure that the maximum relative error on all fields stay under that value.

  • time_stepping (bool, optional, default True) – allow a variable internal time-step to ensure good agreement between computing performance and accuracy.

  • max_iter (float or None, optional, default None) – maximum internal iteration allowed

  • dt_min (float or None, optional, default None) – minimum internal time step allowed

  • recompute_target (bool, optional, default False) – if True a new computation is done when the target time is exceeded, interpolation used otherwise.

embeded_timestepping = True
name = 'RODASPR'
class skfdiff.core.temporal_schemes.ROS2(model)[source]

Bases: ROW_general

Second order Rosenbrock scheme, without time stepping

Parameters:

model (skfdiff.Model) – skfdiff Model

embeded_timestepping = False
name = 'ROS2'
class skfdiff.core.temporal_schemes.ROS3PRL(model, tol=0.1, time_stepping=True, max_iter=None, dt_min=None, safety_factor=0.9, solver='auto', recompute_target=True, iteratif_atol=0.001)[source]

Bases: ROW_general

4th order Rosenbrock scheme, with time stepping

Parameters:
  • model (skfdiff.Model) – skfdiff Model

  • tol (float, optional, default 1E-2) – tolerance factor for the time stepping. The time step will adapt to ensure that the maximum relative error on all fields stay under that value.

  • time_stepping (bool, optional, default True) – allow a variable internal time-step to ensure good agreement between computing performance and accuracy.

  • max_iter (float or None, optional, default None) – maximum internal iteration allowed

  • dt_min (float or None, optional, default None) – minimum internal time step allowed

  • recompute_target (bool, optional, default False) – if True a new computation is done when the target time is exceeded, interpolation used otherwise.

embeded_timestepping = True
name = 'ROS3PRL'
class skfdiff.core.temporal_schemes.ROS3PRw(model, tol=0.1, time_stepping=True, max_iter=None, dt_min=None, safety_factor=0.9, solver='auto', recompute_target=True, iteratif_atol=0.001)[source]

Bases: ROW_general

Third order Rosenbrock scheme, with time stepping

Parameters:
  • model (skfdiff.Model) – skfdiff Model

  • tol (float, optional, default 1E-2) – tolerance factor for the time stepping. The time step will adapt to ensure that the maximum relative error on all fields stay under that value.

  • time_stepping (bool, optional, default True) – allow a variable internal time-step to ensure good agreement between computing performance and accuracy.

  • max_iter (float or None, optional, default None) – maximum internal iteration allowed

  • dt_min (float or None, optional, default None) – minimum internal time step allowed

  • recompute_target (bool, optional, default False) – if True a new computation is done when the target time is exceeded, interpolation used otherwise.

embeded_timestepping = True
name = 'ROS3PRw'
class skfdiff.core.temporal_schemes.ROW_general(model, alpha, gamma, b, b_pred=None, time_stepping=False, tol=None, max_iter=None, dt_min=None, safety_factor=0.9, solver='auto', recompute_target=True, iteratif_atol=0.001, initial_dt=None)[source]

Bases: TemporalScheme

Rosenbrock Wanner class of temporal solvers

The implementation and the different parameters can be found in http://www.digibib.tu-bs.de/?docid=00055262

class skfdiff.core.temporal_schemes.Stationnary(model, solver=<function spsolve>)[source]

Bases: object

Solve the stationnary problem F(U) = 0

Parameters:
  • model (skfdiff.Model) – skfdiff Model

  • solver (callable, optional, default scipy.sparse.linalg.spsolve) – method able to solve a Ax = b linear equation with A a sparse matrix. Take A and b as argument and return x.

class skfdiff.core.temporal_schemes.TemporalScheme[source]

Bases: object

class skfdiff.core.temporal_schemes.Theta(model, theta=1, solver=<function spsolve>)[source]

Bases: TemporalScheme

Simple theta-based scheme where theta is a weight

if theta = 0, the scheme is a forward-euler scheme if theta = 1, the scheme is a backward-euler scheme if theta = 0.5, the scheme is called a Crank-Nicolson scheme

Parameters:
  • model (skfdiff.Model) – skfdiff Model

  • theta (int, optional, default 1) – weight of the theta-scheme

  • solver (callable, optional, default scipy.sparse.linalg.spsolve) – method able to solve a Ax = b linear equation with A a sparse matrix. Take A and b as argument and return x.

embeded_timestepping = False
name = 'theta'
skfdiff.core.temporal_schemes.add_time_stepping(scheme, tol=0.01, ord=2, m=20, reject_factor=2)[source]
skfdiff.core.temporal_schemes.get_temporal_scheme(name)[source]

get a temporal_scheme by its name

Parameters:

name (name {str} -- temporal_scheme) –

Raises:

NotImplementedError -- raised if the temporal_scheme is not available.

Returns:

TemporalScheme – the requested temporal_scheme

skfdiff.core.temporal_schemes.null_hook(t, fields)[source]
skfdiff.core.temporal_schemes.register_temporal_scheme(CustomTemporalScheme)[source]
class skfdiff.core.temporal_schemes.scipy_ivp(model, use_jac=True, method='RK45', **integrator_kwargs)[source]

Bases: TemporalScheme

Proxy written around the scipy.integrate.solve_ivp function. Give access to all the scipy integrators.

Parameters:
  • model (skfdiff.Model) – skfdiff Model

  • method (str, optional, default 'RK45') – name of the chosen scipy integration scheme.

  • **integrator_kwargs – extra arguments provided to the scipy integration scheme.

embeded_timestepping = True
name = 'scipy_ivp'