Source code for skfdiff.simulation

#!/usr/bin/env python
# coding=utf-8

import inspect
import time
import warnings
from collections import namedtuple
from datetime import datetime, timedelta
from uuid import uuid4

import cloudpickle
import streamz
from loguru import logger
from numpy import isclose
import xarray as xr

from .plugins.container import MemoryContainer, NetCDFContainer, ZarrContainer
from .plugins.displays import Display
from .utils import tqdm
from .core.temporal_schemes import add_time_stepping, get_temporal_scheme

now = datetime.now


[docs]class Timer: def __init__(self, last, total): self.last = last self.total = total def __repr__(self): repr = """last: {last} total: {total}""" return repr.format( last=(now() - timedelta(seconds=self.last)), total=(now() - timedelta(seconds=self.total)), )
[docs]def null_hook(t, fields): return fields
[docs]def get_total_iter(tmax, user_dt): if tmax: return tmax // user_dt return None
[docs]def get_initial(total_iter, i): if total_iter and i < total_iter: return i if total_iter: return total_iter return 0
[docs]def postprocess_save_timer(simul): simul.fields["last_run_time"] = simul.timer.last simul.fields["total_run_time"] = simul.timer.total
PostProcess = namedtuple("PostProcess", ["name", "function", "description"]) def _reduce_simulation( model, fields, dt, t, tmax, i, id, _pprocesses, _scheme, status, _total_running, _last_running, _created_timestamp, _started_timestamp, _last_timestamp, _actual_timestamp, _hook, _container, ): simul = Simulation(model=model, fields=fields, dt=dt, t=t, tmax=tmax, id=id) simul.i = i simul._pprocesses = cloudpickle.loads(_pprocesses) simul._scheme = cloudpickle.loads(_scheme) simul.status = status simul._total_running = _total_running simul._last_running = _last_running simul._created_timestamp = _created_timestamp simul._started_timestamp = _started_timestamp simul._last_timestamp = _last_timestamp simul._actual_timestamp = _actual_timestamp simul._hook = _hook simul._container = _container return simul
[docs]class Simulation(object): """High level container used to run simulation build on skfdiff Model. This object is an iterable which will yield every time step until the parameters 'tmax' is reached if provided. By default, the solver use a 6th order ROW solver, an implicit method with integrated time-stepping. Parameters ---------- model : skfdiff.Model Contain finite difference approximation and routine of the dynamical system fields : skfdiff.BaseFields or dict (any mappable) skfdiff container or mappable filled with initial conditions dt : float time stepping for output. if time_stepping is False, the internal time stepping will be the same. t : float, optional, default 0. initial time tmax : float, optional, default None Control the end of the simulation. If None (the default), the com- putation will continue until interrupted by the user (using Ctrl-C or a SIGTERM signal). id : None, optional Name of the simulation. A 2 word slug will be generated if not provided. hook : callable, optional, default null_hook. Any callable taking the actual time, fields and parameters and return modified fields and parameters. Will be called every internal time step and can be used to include time dependent or conditionnal parameters, boundary conditions... The default null_hook has no impact on the computation. scheme : callable, optional, default skfdiff.schemes.RODASPR An callable object which take the simulation state and return the next step. Its signature is scheme.__call__(fields, t, dt, hook) and it should return the next time and the updated fields. It take the model and extra positional and named arguments. time_stepping : boolean, default True Indicate if the time step is controlled by an algorithm dependant of the temporal scheme (see the doc on time stepping for extra info). **kwargs extra arguments passed to the scheme. Attributes ---------- dt : float output time step fields : skfdiff.Fields skfdiff container filled with actual data i : int actual iteration id : str name of the simulation model : skfdiff.Model skfdiff Model used in the simulation status : str status of the simulation, one of the following one: ('created', 'running', 'finished', 'failed') t : float actual time tmax : float or None, default None stopping time of the simulation. Not stopping if set to None. Examples -------- >>> import numpy as np >>> import skfdiff >>> model = skfdiff.Model(["k1 * dxxU", ... "k2 * dxxV"], ... ["U", "V"], ... ["k1", "k2"]) >>> x = np.linspace(0, 100, 1000, endpoint=False) >>> U = np.cos(x * 2 * np.pi / 100) >>> V = np.sin(x * 2 * np.pi / 100) >>> fields = model.Fields(x=x, U=U, V=V) >>> simulation = skfdiff.Simulation(model, fields, dt=5., tmax=50.) >>> for t, fields in simulation: ... pass >>> print(t) 50.0 """ # noqa def __init__( self, model, fields, dt, t=0, tmax=None, id=None, hook=null_hook, scheme="RODASPR", time_stepping=True, **kwargs ): def intersection_kwargs(kwargs, function): """Inspect the function signature to identify the relevant keys in a dictionary of named parameters. """ func_signature = inspect.signature(function) func_parameters = func_signature.parameters kwargs = { key: value for key, value in kwargs.items() if key in func_parameters } return kwargs kwargs["time_stepping"] = time_stepping self.id = str(uuid4())[:6] if not id else id self.model = model if not isinstance(fields, xr.Dataset): fields = model.Fields(**{var: fields[var] for var in fields.keys()}) self.fields = fields self.t = t self.user_dt = self.dt = dt self.tmax = tmax self.i = 0 self._stream = streamz.Stream() self._pprocesses = [] scheme = get_temporal_scheme(scheme) self._scheme = scheme(model, **intersection_kwargs(kwargs, scheme.__init__)) if time_stepping and not self._scheme.embeded_timestepping: self._scheme = add_time_stepping( self._scheme, **intersection_kwargs(kwargs, add_time_stepping) ) self.status = "created" self._total_running = 0 self._last_running = 0 self._total_plugins_running = 0 self._last_plugins_running = 0 self._created_timestamp = now() self._started_timestamp = None self._last_timestamp = None self._actual_timestamp = now() self._hook = hook self._container = None self._iterator = self.compute() def _compute_one_step(self, t, fields): """ Compute one step of the simulation, then update the timers. """ fields = self._hook(t, fields) self.dt = self.tmax - t if self.tmax and (t + self.dt >= self.tmax) else self.dt before_compute = time.process_time() t, fields = self._scheme(t, fields, self.dt, hook=self._hook) after_compute = time.process_time() self._last_running = after_compute - before_compute self._total_running += self._last_running self._last_timestamp = self._actual_timestamp self._actual_timestamp = now() return t, fields
[docs] def compute(self): """Generator which yield the actual state of the system every dt. Yields ------ tuple : t, fields Actual time and updated fields container. """ fields = self.fields t = self.t self._started_timestamp = now() self.stream.emit(self) if self.tmax and isclose(self.t, self.tmax): self._end_simulation() return try: while True: t, fields = self._compute_one_step(t, fields) self.i += 1 self.t = t self.fields = fields for pprocess in self.post_processes: pprocess.function(self) before_emit = time.time() self.stream.emit(self) after_emit = time.time() self._last_plugins_running = after_emit - before_emit self._total_plugins_running += self._last_plugins_running yield self.t, self.fields if self.tmax and isclose(self.t, self.tmax): self._end_simulation() return except RuntimeError: self.status = "failed" raise
def _end_simulation(self): if self.container: self.container.flush() def _run_with_progress(self, total_iter, initial, log): with tqdm(initial=initial, total=total_iter) as pbar: for t, fields in self: pbar.update(1) pbar.set_description("%s running: t: %g" % (self.id, t)) log("%s running: t: %g" % (self.id, t)) try: return t, fields except UnboundLocalError: warnings.warn("Simulation already ended") def _run_without_progress(self, log): for t, fields in self: log("%s running: t: %g" % (self.id, t)) try: return t, fields except UnboundLocalError: warnings.warn("Simulation already ended")
[docs] def run(self, progress=True, verbose=False): """Compute all steps of the simulation. Be careful: if tmax is not set, this function will result in an infinit loop. Returns ------- (t, fields): last time and result fields. """ log = logger.info if verbose else logger.debug total_iter = get_total_iter(self.tmax, self.user_dt) initial = get_initial(total_iter, self.i) if progress: return self._run_with_progress(total_iter, initial, log) return self._run_without_progress(log)
def __repr__(self): repr = """{simulation_name:=^30} created: {created_date} started: {started_date} last: {last_date} time: {t:g} iteration: {iter:g} last step: {step_time} total time: {running_time} Hook function ------------- {hook_source} Container --------- {container} =========== Model =========== {model_repr}""" repr = repr.format( simulation_name=" %s " % self.id, container=self.container, t=self.t, iter=self.i, model_repr=self.model, hook_source=inspect.getsource(self._hook), step_time=self._last_running, running_time=self._total_running, created_date=(self._created_timestamp), started_date=( self._started_timestamp if self._started_timestamp else "None" ), last_date=(self._last_timestamp if self._last_timestamp else "None"), ) return repr
[docs] def attach_container( self, path=None, save="all", mode="w", nbuffer=None, save_interval=None, force=False, backend="netcdf", ): """add a Container to the simulation which allows some persistance to the simulation. Parameters ---------- path : str or None (default: None) path for the container. If None (the default), the data lives only in memory (and are available with `simulation.container`) mode : str, optional "a" or "w" (default "w") save : str, optional "all" will save every time-step, "last" will only get the last time step nbuffer : int, optional wait until nbuffer data in the Queue before saving on disk. save_interval : int, optional wait until save_interval since last flush before saving on disk. force : bool, optional (default False) if True, remove the target folder if not empty. if False, raise an error. """ if path is None: self._container = MemoryContainer( None, save=save, mode=mode, force=force, nbuffer=nbuffer, save_interval=save_interval, ) elif backend == "netcdf": self._container = NetCDFContainer( "%s/%s" % (path, self.id), save=save, mode=mode, force=force, nbuffer=nbuffer, save_interval=save_interval, ) elif backend == "zarr": self._container = ZarrContainer( "%s/%s.zarr" % (path, self.id), save=save, mode=mode, force=force, nbuffer=nbuffer, save_interval=save_interval, ) else: raise ValueError("Backend should be either `netcdf` or `zarr`.") self._container.connect(self.stream) return self._container
@property def post_processes(self): """list of skfdiff.core.simulation.PostProcess: contain all the post processing function attached to the simulation. """ return self._pprocesses @property def stream(self): """ streamz.Stream: Streamz starting point, fed by the simulation state after each time_step. This interface is used for post-processing, saving the data on disk by the Container and display the fields in real-time. """ return self._stream @property def container(self): """skfdiff.Container: give access to the attached container, if any. """ return self._container @property def container_data(self): """skfdiff.Container: give access to the attached data in the attached container, if any. """ return self._container.data
[docs] def display(self, keys="unknowns", n=1, dim_allowed=(0, 1, 2)): return Display.display_fields(self, keys=keys, n=n, dim_allowed=dim_allowed)
[docs] def display_custom(self, plot_function, n=1): return Display.display_custom(self, plot_function, n=n)
@property def timer(self): """ skfdiff.core.simulation.Timer: the cpu time of the previous step and the total running time of the simulation. """ return Timer(self._last_running, self._total_running) @property def plugins_timer(self): """ skfdiff.core.simulation.Timer: the cpu time of the plugins (post process, data save, display...). """ return Timer(self._last_plugins_running, self._total_plugins_running)
[docs] def add_post_process(self, name, post_process, description=""): """add a post-process Parameters ---------- name : str name of the post-traitment post_process : callback (function of a class with a __call__ method or a streamz.Stream). this callback have to accept the simulation state as parameter and return the modifield simulation state. if a streamz.Stream is provided, it will me plugged_in with the previous streamz (and ultimately to the initial_stream). All these stream accept and return the simulation state. description : str, optional, Default is "". give extra information about the post-processing """ self._pprocesses.append( PostProcess(name=name, function=post_process, description=description) ) self._pprocesses[-1].function(self)
[docs] def remove_post_process(self, name): """remove a post-process Parameters ---------- name : str name of the post-process to remove. """ self._pprocesses = [ post_process for post_process in self._pprocesses if post_process.name != name ]
def __iter__(self): return self.compute() def __next__(self): return next(self._iterator) def __reduce__(self): return ( _reduce_simulation, ( self.model, self.fields, self.dt, self.t, self.tmax, self.i, self.id, cloudpickle.dumps(self._pprocesses), cloudpickle.dumps(self._scheme), self.status, self._total_running, self._last_running, self._created_timestamp, self._started_timestamp, self._last_timestamp, self._actual_timestamp, self._hook, self._container, ), )