Source code for skfdiff.model

#!/usr/bin/env python
# coding=utf-8

import sys
import warnings
from copy import deepcopy
from itertools import chain

import forge
import numpy as np
import cloudpickle

# from loguru import logger
from xarray import Dataset

from .core.backends import get_backend
from .core.grid_builder import GridBuilder
from .core.system import PDESys
from .simulation import Simulation, null_hook


sys.setrecursionlimit(40000)
EPS = 1e-6


[docs]def create_fields(coords, unks, pars): unk_names = [unk.name for unk in unks] ivar_names = [coord.name for coord in coords] par_names = [par.name for par in pars] @forge.sign( *[forge.kwarg(name=name) for name in chain(ivar_names, unk_names, par_names)] ) def Fields(**kwargs): """Create a xarray Dataset as expected by the model input.""" return Dataset( data_vars={ unk.name: ( unk.c_names, np.broadcast_to( kwargs[unk.name], (kwargs[coord].size for coord in unk.c_names) ).copy(), ) for unk in chain(unks, pars) }, coords={coord.name: kwargs[coord.name] for coord in coords}, ) return Fields
[docs]class Model: r"""Contain finite difference approximation and routine of the dynamical system. Take a mathematical form as input, use Sympy to transform it as a symbolic expression, perform the finite difference approximation and expose theano optimized routine for both the right hand side of the dynamical system and Jacobian matrix approximation. Parameters ---------- evolution_equations : Union[Sequence[str], str] the right hand sides of the partial differential equations written as :math:`\\frac{\\partial U}{\\partial t} = F(U)`, where the spatial derivative can be written as `dxxU` or `dx(U, 2)` or with the sympy notation `Derivative(U, x, x)` unknowns : Union[Sequence[str], str] the dependent variables with the same order as the temporal derivative of the previous arg. parameters : Optional[Union[Sequence[str], str]] list of the parameters. Can be feed with a scalar of an array with the same size. They can be derivated in space as well. boundary_conditions : Optional[Union[str, Dict[str, Dict[str, Tuple[str, str]]]]] Can be either "noflux" (default behavior), "periodic", or a dictionary that follow this structure : {dependent_var: {indep_var: (left_boundary, right_boundary)}}, the boundaries being written as residual (rhs - lhs = 0). For example, an hybrid Dirichlet / Neumann flux will be written as {"U": {"x": (dxU - 2, U - 3)}}. If a boundary is None, a nul flux will be applied. subs : Optional[Dict[str, str]] Substitution dictionnary, useful to write complex systems. The key of this dictionnary will be substitued everywhere in the evolution equations as well as in boundary conditions. backend: str, default to "numpy" A registered backend that will take the discretized system and expose the evolution equations routine ``F`` as well as an optional Jacobian routine ``J``. If the later is not provided by the backend, the implicit methods will not be available. TODO: make an efficient jacobian approximation for that case, given the bandwidth. For now, there is the ``numpy`` backend, that provide efficient (but not optimal) computation with minimal dependencies, and ``numba`` backend that provide a LLVM based routine that allow faster parallel computation at the costs of a (sometime long) warm-up. The numba one is less tested and stable, but provide a potential huge speed-up for explicit methods. backend_kwargs: Optional[Dict] : supplementary arguments provided to the backend. Examples -------- A simple diffusion equation: >>> from skfdiff import Model >>> model = Model("k * dxxU", "U", "k") A coupled system of convection-diffusion equation: >>> from skfdiff import Model >>> model = Model(["k1 * dxxU - c1 * dxV", ... "k2 * dxxV - c2 * dxU",], ... ["U", "V"], ["k1", "k2", "c1", "c2"]) A 2D pure advection model, without / with upwind scheme. >>> from skfdiff import Model >>> model = Model("c_x * dxU + c_y * dyU", "U(x, y)", ... ["c_x", "c_y"]) >>> model = Model("upwind(c_x, U, x, 2) + upwind(c_y, U, y, 2)", ... "U(x, y)", ... ["c_x", "c_y"]) A 2D diffusion model, with hybrid boundary conditions (Dirichlet, Neuman, Robin and No-flux). >>> from skfdiff import Model >>> model = Model("dxxU + dyyU", "U(x, y)", ... boundary_conditions={("U", "x"): ("U - 3", "dxU + 5"), ... ("U", "y"): ("dyU - (U - 3)", None) ... }) """ # noqa def __init__( self, evolution_equations, unknowns, parameters=None, coordinates=None, boundary_conditions=None, subs=None, custom_functions=None, backend="numpy", backend_kwargs=None, ): if parameters is None: parameters = [] if coordinates is None: coordinates = [] if boundary_conditions is None: boundary_conditions = {} if subs is None: subs = {} if backend_kwargs is None: backend_kwargs = {} if custom_functions is None: custom_functions = {} self.evolution_equations = evolution_equations self.unknowns = deepcopy(unknowns) self.parameters = deepcopy(parameters) self.coordinates = deepcopy(coordinates) self.boundary_conditions = deepcopy(boundary_conditions) self.subs = deepcopy(subs) self.custom_functions = deepcopy(custom_functions) self.pdesys = PDESys( evolution_equations=self.evolution_equations, unknowns=self.unknowns, coordinates=self.coordinates, parameters=self.parameters, boundary_conditions=self.boundary_conditions, subs=self.subs, custom_functions=custom_functions, ) self.grid_builder = GridBuilder(self.pdesys) self.backend = get_backend(backend)( self.pdesys, self.grid_builder, custom_functions=custom_functions, **backend_kwargs ) @property def Fields(self): """Base for structured fields stored in a xarray.""" return create_fields( self.pdesys.coordinates, self.pdesys.unknowns, self.pdesys.parameters )
[docs] def F(self, fields, t=0): """Compute the right hand side of the dynamical system""" return self.backend.F(fields, t=t)
[docs] def J(self, fields, t=0): """Compute the Jacobian of the dynamical system as sparce csc matrix""" try: return self.backend.J(fields, t=t) except AttributeError: raise AttributeError( "current backend %s lack of routine to compute the jacobian matrix. " "Only explicite scheme will be working." % self.backend.name )
[docs] def U_from_fields(self, fields, t=0): """Get a fields and return the data as a single flat vector.""" return self.grid_builder.U_from_fields(fields, t=t)
[docs] def fields_from_U(self, U, fields, t=0): """take a solution ad single flat vector and return structured fields.""" return self.grid_builder.fields_from_U(U, fields, t=t)
def __getstate__(self): return {key: cloudpickle.dumps(value) for key, value in self.__dict__.items()} def __setstate__(self, state): self.__dict__.update( {key: cloudpickle.loads(value) for key, value in state.items()} ) def __repr__(self): if not self.boundary_conditions: bdc = "noflux" elif isinstance(self.boundary_conditions, dict): bdc = "\n".join(map(str, self.boundary_conditions.items())) else: bdc = self.boundary_conditions return """System ------- {pdesys} Boundary condition ------------------ {bdc} backend ------- {backend} """.format( pdesys="\n\n".join( [ "dt%s = %s" % (unk, pde.equation) for unk, pde in zip(self.unknowns, self.pdesys) ] ), bdc=bdc, backend=self.backend.name, )
[docs] def fields_template(self, *args, **kwargs): warnings.warn( "This interface is deprecied, use model.Fields instead. Will be removed in the 0.7.0", DeprecationWarning, ) return self.Fields(*args, **kwargs)
[docs] def init_simulation( self, fields, dt, t=0, tmax=None, id=None, hook=null_hook, scheme="RODASPR", time_stepping=True, **kwargs ): """Init a skfdiff Simulation. See skfdiff.simulation.Simulation docstring for further details. """ return Simulation( model=self, fields=fields, dt=dt, t=t, tmax=tmax, id=id, hook=hook, scheme=scheme, time_stepping=time_stepping, **kwargs )