Simulation¶
The Simulation
class is the highest level object in scikit-fdiff
library. It aim to control the solving of an IBVP (Initial Boundary Values
Problem) in a as simple way as possible. It is supposed to be simple to use,
yet giving the user some useful goodies to help to handle extra steps. It
offers a stream interface to allow complex custom pipeline to be built, as well
as useful common operation as retaining the fields at each timestep (in memory
or on disk), or plotting the fields during the solving
(see the Stream section).
As the Model
deal with the spatial discretisation of the
problem, the Simulation
are about doing the temporal update
of the unknowns.
At minima, you have to provide a Model
, an initial Fields
and a time step dt
. You can supply the initial time t
, and
the end of the simulation t_max
. Without the t_max
parameter, the simulation will run forever.
>>> import numpy as np
>>> from skfdiff import Model, Simulation
>>> model = Model("k * (dxxT + dyyT)", "T(x, y)", parameters="k")
>>> x = np.linspace(-np.pi, np.pi, 56)
>>> y = np.linspace(-np.pi, np.pi, 56)
>>> xx, yy = np.meshgrid(x, y, indexing="ij")
>>> T = np.sqrt(xx ** 2 + yy ** 2)
>>> initial_fields = model.Fields(x=x, y=y, T=T, k=1)
The two following lines have the same effect :
simulation = Simulation(model, initial_fields, dt=.25)
simulation = model.init_simulation(initial_fields, dt=.25)
As the t_max
is not set, the simulation will run until the user
interrupt the script. You can run a simulation until the end with
Simulation.run()
:
simulation = model.init_simulation(initial_fields, dt=.25, tmax=1)
simulation.run()
The Simulation
is also an iterator, which each iteration leads to the
next time step (and its updated fields). You can run the simulation by
iterating on the Simulation
object.
for t, fields in simulation.run():
pass
That later is useful as it allows the user to process the updated fields very easily, without the complexity of the streams.
for t, fields in simulation.run():
do_stuff_with_the_fields(t, fields)
You can specify the temporal_scheme
(as described in
the dedicated section) and its parameters. According
to the time_stepping
parameters, the temporal schemes will take the
user dt
as internal time step, or use an error estimation to compute
the internal time-step (which is the default behaviour, and strongly
recommended).
simulation = model.init_simulation(initial_fields, dt=.25, tmax=1,
scheme="Theta", # using a theta scheme
theta=0.5, # use theta=0.5 for the temporal scheme
time_stepping=False) # the internal time step will be fixed to 0.25
simulation.run()
Temporal schemes¶
As their name indicate, temporal schemes deal with temporal evolution of the systems. They are ODE (ordinary differential equation) solvers that will solve the initial value problem obtained after spatial discretisation of the primitive PDE.
They presents themselves as callable objects that instantiated with a model and
optional parameters, and which each call take (time, fields)
as
arguments to return (updated_time, updated_fields)
.
The objective is to translate
into a discretised update of the unknowns \(U\). If we obtain a direct relation between the unknowns at the next time step and the actual time
the scheme is explicit. This is the easiest to solve, but have strong limitation on the time step \(\Delta t\).
Otherwise, if the discretisation leads to a linear system as
the system is implicit. This means that we have to solve the (possible huge) linear system, which is a computation expansive task. It is interesting to note that an evolution equation \(F^{n+1} = F(U^{n+1})\) can be rewritten thanks to the jacobian as
The Jacobian matrix is strongly linked to implicit schemes. As it is possible
to approximate that jacobian matrix, scikit-fdiff
can efficiently
compute it (see the Model section).
Theta schemes (Forward, Backward Euler and Crank-Nicolson)¶
They are the simplest schemes that exists. The forward (resp. backward) Euler schemes are obtained with first order Taylor expansion, evaluated at \(t + \Delta t\) (resp. \(t - \Delta t\)).
The first one reads
The later
This later is implicit and has to be rewritten using the Jacobian matrix as described earlier.
We can also use a weighted average of both extrema. Instead of evaluating the differential at the bounds, we use a median point. In that case, we can write
\(\theta\) being bounded between 0 and 1. \(\theta = 0\) conduct to a forward Euler scheme, \(\theta = 1\) to a backward Euler scheme, and \(\theta = 0.5\) to a Crank-Nicolson scheme.
You can use these scheme with the Theta
scheme.
simulation = model.init_simulation(initial_fields, dt=.25, tmax=1,
scheme="Theta", # using a theta scheme
theta=0.5) # use theta=0.5 for Crank-Nicolson. Default is 1.
simulation.run()
scipy solve_ivp wrapper¶
Scipy provide a range of ODE solvers. This scheme is a simple wrapper that give
access to the scipy.integrate.solve_ivp
routine. By default, this
will use a Runge-Kutta 4/5 (explicit) scheme, but you can select your
integrator and change the solver parameters easily.
Be careful, as these routine are not well optimized for large linear system you can obtain after PDE discretisation.
Rosenbrock-Wanner schemes¶
They are the default scikit-fdiff schemes. They are Implicit Runge Kutta scheme written to have a common left-hand-side. That way, only one Jacobian evaluation is needed, as well as only one system factorisation per step.
This allow to have access to high order temporal solver at reasonable cost. Because they are Runge-Kutta schemes, they come with computation-free error estimation, giving access to time-stepping control without extra cost.
The description of the scheme and their coefficient can be found here.
A word on automatic time-stepping¶
The implemented time-stepping will use an error estimation. It should ensure that error stay behind a tolerance factor.
If some schemes come with a built-in error estimation (as the Rosenbrock-Wanner schemes), most of them lack this feature. In that case, the error is estimated by doing a comparison between a large time step and N smaller time-steps. If this ensure that the error stay bounded, it come at the cost of extra computation.
Hook¶
Sometime, you have to introduce non-linear effect that can note be easily
described via the scikit-fdiff mathematical notation. In that case, you can
use a hook
.
This is a python function that will modify the fields
every time a
routine is called.
As example, you can use it to have a time-dependant dirichlet condition:
import numpy as np
from skfdiff import Model, Simulation
model = Model("k * dxxT", "T(x)", parameters="k",
boundary_conditions={("T", "x"): ("dirichlet", "noflux")})
x = np.linspace(-np.pi, np.pi, 256)
T = np.zeros_like(x)
initial_fields = model.Fields(x=x, T=T, k=1)
def time_dependant_dirichlet(t, fields):
fields["T"][0] = np.cos(t)
return fields
simulation = model.init_simulation(initial_fields, dt=.25, tmax=1,
hook=time_dependant_dirichlet)
simulation.run()
Be aware that means that you introduce a (possible) strong non-linearity that will not be implicit. This will lead to small time-step, and should be used as last resort. The same result could be obtained with an “artificial” Robin condition :
import numpy as np
from skfdiff import Model, Simulation
model = Model("k * dxxT", "T(x)", parameters="k",
boundary_conditions={("T", "x"): ("dxU - 1E6 * (U - cos(t))",
"noflux")})
x = np.linspace(-np.pi, np.pi, 256)
T = np.zeros_like(x)
initial_fields = model.Fields(x=x, T=T, k=1)
simulation = model.init_simulation(initial_fields, dt=.25, tmax=1)
simulation.run()
Stream Interface¶
The simulations are linked to a Streamz.Stream
, and will emit itself
at every user time step. This can be used to add extra step after each
time-step, as seen in that toy case :
simulation = model.init_simulation(initial_fields, dt=.25, tmax=1)
simulation.stream.map(lambda simul: simul.t).sink(print)
simulation.run(progress=False)
In that case, we will get the time of the simulation and print it every step.
You can obviously build a more complex pipeline. For that, you can refer to the Streamz documentation.
Some useful goodies have been implemented in the scikit-fdiff core, as described into the next paragraphs.
Post-Process¶
It is possible to add a callable that will be called after every step with
the add_post_process
, which will take the name of the post process
as well as a python function. This function will take the simulation itself as
argument : every modification done to the simulation will impact the streams.
The most common use is to add extra value to the fields, which could feed the container or the display.
>>> def compute_flux(simul):
... dxT = simul.fields["T"].differentiate("x")
... dyT = simul.fields["T"].differentiate("y")
... magnitude = np.sqrt(dxT**2 + dyT**2)
... simul.fields["dxT"] = dxT
... simul.fields["dyT"] = dyT
... simul.fields["mag"] = magnitude
>>> simulation = model.init_simulation(initial_fields, dt=.25)
>>> simulation.add_post_process("fluxs", compute_flux)
>>> list(simulation.fields.variables)
['T', 'k', 'x', 'y', 'dxT', 'dyT', 'mag']
Display¶
Scikit-fdiff come with a real time display of the fields during the simulation. For now, this display is built to work inside a jupyter notebook (support for non-interactive environment is planned for version 0.7).
Be aware that that display work in the main loop, and will slow down the simulation (this should improved in the 0.7). It is designed to help the user when prototyping, when a lot of different run with different parameters can be needed. It permit to quickly see how the unknowns evolves and update the simulation parameters as quick as the problem happens.
This display is built upon Holoviews, and all that can be applied to holoviews plots can applied on scikit-fdiff display.
from skfdiff import enable_notebook
enable_notebook()
display = simulation.display(["T", "mag"])
display # this line has to be at the end of a Jupyter cell
By default, everything but parameters will be displayed if they
are between 0D, 1D or 2D. You can supply a list of keys to control what will
be displayed. The display.hv_curve
property return the underlying
holoviews layout if the user want finer control on the display.
You can also build a custom visualisation by providing a callback that will return a Holoviews element.
import holoviews as hv
def plot_slices(simul):
fields = simul.fields
slices_positions = [-3, 0, 3]
curves = []
for slice_ in fields["T"].sel(x=slices_positions, method="nearest"):
curves.append(hv.Curve(slice_, label="x: %g" % slice_.x))
return hv.Overlay(curves).opts(hv.opts.Curve(width=600))
display = simulation.display_custom(plot_slices)
display # this line has to be at the end of a Jupyter cell
Container¶
A container is an object that will retain the fields across the time-steps. It can be done in memory, so the user can access to all time-step to keep the temporal evolution of the unknowns, or persist on disk to retrieve the unknowns between two session.
>>> import numpy as np
>>> from skfdiff import Model, Simulation
>>> model = Model("k * (dxxT + dyyT) - c * dxT", "T(x, y)",
... parameters=["k", "c"], boundary_conditions="periodic")
>>> x = np.linspace(-np.pi, np.pi, 56)
>>> y = np.linspace(-np.pi, np.pi, 56)
>>> xx, yy = np.meshgrid(x, y, indexing="ij")
>>> T = np.cos(xx) * np.sin(yy)
>>> initial_fields = model.Fields(x=x, y=y, T=T, k=1E-2, c=1)
>>> simulation = model.init_simulation(initial_fields, dt=.25, tmax=5)
>>> simulation.add_post_process("fluxs", compute_flux)
>>> container = simulation.attach_container() # by default, add a in-memory container
>>> t, fields = simulation.run()
>>> container.data.t.values.tolist()
[0.0, ..., 5.0]
>>> container.data["T"].sel(x=0, method="nearest").plot()
<matplotlib.collections.QuadMesh ...>
If you need to save the fields on disk, you can do it easily:
>>> from skfdiff import retrieve_container
>>> simulation = model.init_simulation(initial_fields, dt=.25, tmax=5)
>>> simulation.add_post_process("fluxs", compute_flux)
>>> container = simulation.attach_container("/tmp/output") # by default, add a in-memory container
>>> t, fields = simulation.run()
>>> retrieved_container = retrieve_container("/tmp/output/%s" % simulation.id)
These data are stored in a netcdf file, which can be opened with software like Panoply.
They are stored by chunks containing some iteration, that can be hard to process by external software. You can merge these files to make that easier.
>>> container.merge()
Path('...merged_data.nc')