Simulation

The Simulation class is the highest level object in scikit-fdiff library. It aim to control the solving of an IBVP (Initial Boundary Values Problem) in a as simple way as possible. It is supposed to be simple to use, yet giving the user some useful goodies to help to handle extra steps. It offers a stream interface to allow complex custom pipeline to be built, as well as useful common operation as retaining the fields at each timestep (in memory or on disk), or plotting the fields during the solving (see the Stream section).

As the Model deal with the spatial discretisation of the problem, the Simulation are about doing the temporal update of the unknowns.

At minima, you have to provide a Model, an initial Fields and a time step dt. You can supply the initial time t, and the end of the simulation t_max. Without the t_max parameter, the simulation will run forever.

>>> import numpy as np
>>> from skfdiff import Model, Simulation
>>> model = Model("k * (dxxT + dyyT)", "T(x, y)", parameters="k")
>>> x = np.linspace(-np.pi, np.pi, 56)
>>> y = np.linspace(-np.pi, np.pi, 56)
>>> xx, yy = np.meshgrid(x, y, indexing="ij")
>>> T = np.sqrt(xx ** 2 + yy ** 2)
>>> initial_fields = model.Fields(x=x, y=y, T=T, k=1)

The two following lines have the same effect :

simulation = Simulation(model, initial_fields, dt=.25)
simulation = model.init_simulation(initial_fields, dt=.25)

As the t_max is not set, the simulation will run until the user interrupt the script. You can run a simulation until the end with Simulation.run():

simulation = model.init_simulation(initial_fields, dt=.25, tmax=1)
simulation.run()

The Simulation is also an iterator, which each iteration leads to the next time step (and its updated fields). You can run the simulation by iterating on the Simulation object.

for t, fields in simulation.run():
    pass

That later is useful as it allows the user to process the updated fields very easily, without the complexity of the streams.

for t, fields in simulation.run():
    do_stuff_with_the_fields(t, fields)

You can specify the temporal_scheme (as described in the dedicated section) and its parameters. According to the time_stepping parameters, the temporal schemes will take the user dt as internal time step, or use an error estimation to compute the internal time-step (which is the default behaviour, and strongly recommended).

simulation = model.init_simulation(initial_fields, dt=.25, tmax=1,
                                   scheme="Theta",  # using a theta scheme
                                   theta=0.5,  # use theta=0.5 for the temporal scheme
                                   time_stepping=False)  # the internal time step will be fixed to 0.25
simulation.run()

Temporal schemes

As their name indicate, temporal schemes deal with temporal evolution of the systems. They are ODE (ordinary differential equation) solvers that will solve the initial value problem obtained after spatial discretisation of the primitive PDE.

They presents themselves as callable objects that instantiated with a model and optional parameters, and which each call take (time, fields) as arguments to return (updated_time, updated_fields).

The objective is to translate

\[\partial_t U = F(U)\]

into a discretised update of the unknowns \(U\). If we obtain a direct relation between the unknowns at the next time step and the actual time

\[U^{n + 1} = g(U^{n})\]

the scheme is explicit. This is the easiest to solve, but have strong limitation on the time step \(\Delta t\).

Otherwise, if the discretisation leads to a linear system as

\[U^{n + 1} = g(U^{n})\]

the system is implicit. This means that we have to solve the (possible huge) linear system, which is a computation expansive task. It is interesting to note that an evolution equation \(F^{n+1} = F(U^{n+1})\) can be rewritten thanks to the jacobian as

\[F^{n+1} = F^{n} + J^{n}\,(U^{n+1} - U^{n})\]

The Jacobian matrix is strongly linked to implicit schemes. As it is possible to approximate that jacobian matrix, scikit-fdiff can efficiently compute it (see the Model section).

Theta schemes (Forward, Backward Euler and Crank-Nicolson)

They are the simplest schemes that exists. The forward (resp. backward) Euler schemes are obtained with first order Taylor expansion, evaluated at \(t + \Delta t\) (resp. \(t - \Delta t\)).

The first one reads

\[U^{n+1} = F^{n} - U^{n} \, \Delta t\]

The later

\[U^{n+1} = F^{n + 1} - U^{n} \, \Delta t = F^{n} + J^{n}\,(U^{n+1} - U^{n}) - U^{n} \, \Delta t\]

This later is implicit and has to be rewritten using the Jacobian matrix as described earlier.

We can also use a weighted average of both extrema. Instead of evaluating the differential at the bounds, we use a median point. In that case, we can write

\[U^{n+1} = \theta F^{n + 1} + (1 - \theta) F^{n} - U^{n} \, \Delta t\]

\(\theta\) being bounded between 0 and 1. \(\theta = 0\) conduct to a forward Euler scheme, \(\theta = 1\) to a backward Euler scheme, and \(\theta = 0.5\) to a Crank-Nicolson scheme.

You can use these scheme with the Theta scheme.

simulation = model.init_simulation(initial_fields, dt=.25, tmax=1,
                                   scheme="Theta",  # using a theta scheme
                                   theta=0.5)  # use theta=0.5 for Crank-Nicolson. Default is 1.
simulation.run()

scipy solve_ivp wrapper

Scipy provide a range of ODE solvers. This scheme is a simple wrapper that give access to the scipy.integrate.solve_ivp routine. By default, this will use a Runge-Kutta 4/5 (explicit) scheme, but you can select your integrator and change the solver parameters easily.

Be careful, as these routine are not well optimized for large linear system you can obtain after PDE discretisation.

Rosenbrock-Wanner schemes

They are the default scikit-fdiff schemes. They are Implicit Runge Kutta scheme written to have a common left-hand-side. That way, only one Jacobian evaluation is needed, as well as only one system factorisation per step.

This allows to have access to high order temporal solver at reasonable cost. Because they are Runge-Kutta schemes, they come with computation-free error estimation, giving access to time-stepping control without extra cost.

The description of the scheme and their coefficient can be found here.

A word on automatic time-stepping

The implemented time-stepping will use an error estimation. It should ensure that error stays below a tolerance factor.

If some schemes come with a built-in error estimation (as the Rosenbrock-Wanner schemes), most of them lack this feature. In that case, the error is estimated by doing a comparison between a large time step and N smaller time-steps. While this ensures that the error is bound, it comes at the cost of extra computation.

Hook

Sometime, you have to introduce non-linear effect that can not be easily described via the scikit-fdiff mathematical notation. In that case, you can use a hook.

This is a python function that will modify the fields every time a routine is called.

As example, you can use it to have a time-dependant dirichlet condition:

import numpy as np
from skfdiff import Model, Simulation
model = Model("k * dxxT", "T(x)", parameters="k",
              boundary_conditions={("T", "x"): ("dirichlet", "noflux")})

x = np.linspace(-np.pi, np.pi, 256)
T = np.zeros_like(x)
initial_fields = model.Fields(x=x, T=T, k=1)

def time_dependant_dirichlet(t, fields):
    fields["T"][0] = np.cos(t)
    return fields

simulation = model.init_simulation(initial_fields, dt=.25, tmax=1,
                                   hook=time_dependant_dirichlet)
simulation.run()

Be aware that means that you introduce a (possible) strong non-linearity that will not be implicit. This will lead to small time-steps, and should be used as last resort. The same result could be obtained with an “artificial” Robin condition :

import numpy as np
from skfdiff import Model, Simulation
model = Model("k * dxxT", "T(x)", parameters="k",
              boundary_conditions={("T", "x"): ("dxU - 1E6 * (U - cos(t))",
                                                "noflux")})

x = np.linspace(-np.pi, np.pi, 256)
T = np.zeros_like(x)
initial_fields = model.Fields(x=x, T=T, k=1)

simulation = model.init_simulation(initial_fields, dt=.25, tmax=1)
simulation.run()

Stream Interface

The simulations are linked to a Streamz.Stream, and will emit itself at every user time step. This can be used to add extra step after each time-step, as seen in that toy case :

simulation = model.init_simulation(initial_fields, dt=.25, tmax=1)
simulation.stream.map(lambda simul: simul.t).sink(print)
simulation.run(progress=False)

In that case, we will get the time of the simulation and print it every step.

You can obviously build a more complex pipeline. For that, you can refer to the Streamz documentation.

Some useful goodies have been implemented in the scikit-fdiff core, as described in the next paragraphs.

Post-Process

It is possible to add a callable that will be called after every step with the add_post_process, which will take the name of the post process as well as a python function. This function will take the simulation itself as argument : every modification done to the simulation will impact the streams.

The most common use is to add extra value to the fields, which could feed the container or the display.

>>> def compute_flux(simul):
...     dxT = simul.fields["T"].differentiate("x")
...     dyT = simul.fields["T"].differentiate("y")
...     magnitude = np.sqrt(dxT**2 + dyT**2)
...     simul.fields["dxT"] = dxT
...     simul.fields["dyT"] = dyT
...     simul.fields["mag"] = magnitude

>>> simulation = model.init_simulation(initial_fields, dt=.25)
>>> simulation.add_post_process("fluxs", compute_flux)
>>> list(simulation.fields.variables)
['T', 'k', 'x', 'y', 'dxT', 'dyT', 'mag']

Display

Scikit-fdiff comes with a real time display of the fields during the simulation. For now, this display is built to work inside a Jupyter notebook (support for non-interactive environment is planned for version 0.7).

Be aware that that display executes in the main loop, and will slow down the simulation (this should improve in version 0.7). It is designed to help the user when prototyping; when a lot of different runs with different parameters need to be explored. The feature permits to quickly see how the unknowns evolves and update the simulation parameters immediately and iteratively as the problem happens.

This display is built upon Holoviews, and all that can be applied to holoviews plots can applied on scikit-fdiff display.

from skfdiff import enable_notebook
enable_notebook()
display = simulation.display(["T", "mag"])
display  # this line has to be at the end of a Jupyter cell

By default, everything but parameters will be displayed if they are between 0D, 1D or 2D. You can supply a list of keys to control what will be displayed. The display.hv_curve property return the underlying holoviews layout if the user wants finer control on the display.

You can also build a custom visualisation by providing a callback that will return a Holoviews element.

import holoviews as hv

def plot_slices(simul):
    fields = simul.fields
    slices_positions = [-3, 0, 3]
    curves = []
    for slice_ in fields["T"].sel(x=slices_positions, method="nearest"):
        curves.append(hv.Curve(slice_, label="x: %g" % slice_.x))
    return hv.Overlay(curves).opts(hv.opts.Curve(width=600))

display = simulation.display_custom(plot_slices)
display  # this line has to be at the end of a Jupyter cell

Container

A container is an object that will retain the fields across the time-steps. It can be kept in memory, so the user can access to all time-step to keep the temporal evolution of the unknowns, or persist on disk to retrieve the unknowns between two session.

>>> import numpy as np
>>> from skfdiff import Model, Simulation
>>> model = Model("k * (dxxT + dyyT) - c * dxT", "T(x, y)",
...               parameters=["k", "c"], boundary_conditions="periodic")

>>> x = np.linspace(-np.pi, np.pi, 56)
>>> y = np.linspace(-np.pi, np.pi, 56)
>>> xx, yy = np.meshgrid(x, y, indexing="ij")
>>> T = np.cos(xx) * np.sin(yy)
>>> initial_fields = model.Fields(x=x, y=y, T=T, k=1E-2, c=1)
>>> simulation = model.init_simulation(initial_fields, dt=.25, tmax=5)
>>> simulation.add_post_process("fluxs", compute_flux)
>>> container = simulation.attach_container()  # by default, add a in-memory container
>>> t, fields = simulation.run()
>>> container.data.t.values.tolist()
[0.0, ..., 5.0]
>>> container.data["T"].sel(x=0, method="nearest").plot()
<matplotlib.collections.QuadMesh ...>

If you need to save the fields on disk, you can do it easily:

>>> from skfdiff import retrieve_container
>>> simulation = model.init_simulation(initial_fields, dt=.25, tmax=5)
>>> simulation.add_post_process("fluxs", compute_flux)
>>> container = simulation.attach_container("/tmp/output")  # by default, add a in-memory container
>>> t, fields = simulation.run()
>>> retrieved_container = retrieve_container("/tmp/output/%s" % simulation.id)

These data are stored in a netcdf file, which can be opened with software like Panoply.

They are stored by chunks containing some iteration, that can be hard to process by external software. You can merge these files to make that easier.

>>> container.merge()
Path('...merged_data.nc')