Source code for skfdiff.core.backends.numpy_backend

#!/usr/bin/env python
# coding=utf-8

from functools import partial
from operator import attrgetter

import attr
import numpy as np
from cachetools import cachedmethod
from loguru import logger
from scipy.sparse import csc_matrix
from sympy import Symbol, lambdify, cse

from .base_backend import Backend, register_backend


[docs]def np_depvar_printer(unk): return Symbol(str(unk.discrete))
[docs]def np_ivar_printer(coord): return (Symbol(str(coord.discrete)), Symbol(str(coord.idx)), coord.step, coord.N)
[docs]def np_Min(args): min_ = np.fmin.reduce(np.broadcast_arrays(*args)) return min_
[docs]def np_Max(args): max_ = np.fmax.reduce(np.broadcast_arrays(*args)) return max_
[docs]def np_Heaviside(a): return np.where((a < 0) | np.isnan(a), 0, 1)
[docs]class CseLambdify: def __init__(self, args, expr, optimizations=None, order="none", modules="numpy"): self.args = args self.expr = expr try: subs, reduced = cse(expr, order=order, optimizations=optimizations) except AttributeError: subs = {} reduced = self.expr self.subs = subs self.reduced_expr = reduced[0] self.lambda_subs = {} for key, sub in self.subs: self.lambda_subs[str(key)] = lambdify( [*self.args, *self.lambda_subs.keys()], sub, modules=modules ) self.lambda_expr = lambdify( [*self.args, *self.lambda_subs.keys()], self.reduced_expr, modules=modules ) def __call__(self, *args): evaluated = dict(zip(map(str, self.args), args)) for key, func in self.lambda_subs.items(): evaluated[key] = func(**evaluated) evaluated_expr = self.lambda_expr(**evaluated) return evaluated_expr
[docs]@register_backend @attr.s class NumpyBackend(Backend): name = "numpy" cse = attr.ib(type=bool, default=True) optimizations = attr.ib(type=str, default="basic") def __attrs_post_init__(self): customs = {key: func.numpy_eval for key, func in self.custom_functions.items()} modules = [ {"amax": np_Max, "amin": np_Min, "Heaviside": np_Heaviside}, customs, "scipy", ] if self.cse: self.lambdify = partial( CseLambdify, optimizations=self.optimizations, modules=modules ) else: self.lambdify = partial(lambdify, modules=modules) logger.info("numpy backend: convert_inputs...") self._convert_inputs() logger.info("numpy backend: build_evolution_equations...") self._build_evolution_equations() logger.info("numpy backend: build_jacobian...") self._cache_jac_coord = {} self._build_jacobian() def _convert_inputs(self): self.ndim = len(self.system.coordinates) self.unks = list(map(np_depvar_printer, self.system.unknowns)) self.pars = list(map(np_depvar_printer, self.system.parameters)) self.coords, self.idxs, self.steps, self.sizes = zip( *map(np_ivar_printer, self.system.coordinates) ) self.idxs_map = [ tuple([self.system.coordinates.index(coord) for coord in unk.coordinates]) for unk in self.system.unknowns ] self.shapes = [self.system.shapes[unk] for unk in self.system.unknowns] self.inputs = [ self.system._t, *self.unks, *self.pars, *self.coords, *self.idxs, *self.steps, *self.sizes, ] self.inputs_cond = [*self.idxs, *self.sizes] def _build_evolution_equations(self): self.evolution_equations = [ self.lambdify(self.inputs, expr.n()) for expr in self.system.piecewise_system ]
[docs] def compute_F_vector(self, t, unks, pars, coords, sizes): subgrids = self.grid_builder.compute_subgrids(sizes) steps = self.grid_builder.compute_steps( tuple(sizes), tuple([coord.ptp() for coord in coords]) ) system_sizes = self.grid_builder.compute_sizes(sizes) system_size = sum(system_sizes) F = np.empty(system_size) for grid, eq in zip(subgrids, self.evolution_equations): Fi = eq(t, *unks, *pars, *coords, *grid[:, 1:-2].T, *steps, *sizes) F[grid[:, -1]] = Fi return F
[docs] def F(self, fields, t=0): unks = [ fields[varname].values for varname in [unk.name for unk in self.system.unknowns] ] pars = [ fields[varname].values for varname in [par.name for par in self.system.parameters] ] coords = [ fields[varname].values for varname in [coord.name for coord in self.system.coordinates] ] sizes = [coord.size for coord in coords] return self.compute_F_vector(t, unks, pars, coords, sizes)
def _build_jacobian(self): self.jacobian_routines = [ [self.lambdify(self.inputs, diff) for diff in jac_row] for jac_row in self.system.jacobian_values ] self.jacobian_columns = [ [lambdify(self.inputs_cond, col) for col in columns] for columns in self.system.jacobian_columns ]
[docs] def compute_jacobian_values(self, t, unks, pars, coords, sizes): subgrids = self.grid_builder.compute_subgrids(sizes) steps = self.grid_builder.compute_steps( tuple(sizes), tuple([coord.ptp() for coord in coords]) ) data_size = sum( [ subgrid.shape[0] * len(jacs) for subgrid, jacs in zip(subgrids, self.jacobian_columns) ] ) data = np.zeros(data_size) cursor = 0 for grid, jacs in zip(subgrids, self.jacobian_routines): for jac_func in jacs: next_cursor = cursor + grid.shape[0] jac = jac_func( t, *unks, *pars, *coords, *grid[:, 1:-2].T, *steps, *sizes ) data[cursor:next_cursor] = jac cursor = next_cursor return data
[docs] @cachedmethod(cache=attrgetter("_cache_jac_coord")) def compute_jacobian_coordinates(self, *sizes): subgrids = self.grid_builder.compute_subgrids(sizes) system_sizes = self.grid_builder.compute_sizes(sizes) system_size = sum(system_sizes) rows_list = [] cols_list = [] for grid, jac_cols in zip(subgrids, self.jacobian_columns): for col_func in jac_cols: cols_ = np.zeros((grid.shape[0], self.ndim + 1), dtype="int32") cols = col_func(*grid[:, 1:-2].T, *sizes) cols = np.stack( [np.broadcast_to(col, cols_.shape[:-1]) for col in cols] ) flat_cols = self.grid_builder.compute_flat_from_idxs(cols.T, sizes) rows_list.extend(grid[:, -1].reshape((-1,))) cols_list.extend(flat_cols.reshape((-1,))) rows = np.array(rows_list) cols = np.array(cols_list) perm = np.argsort(cols) perm_rows = rows[perm] perm_cols = cols[perm] count = np.zeros((system_size + 1), dtype="int32") uq, cnt = np.unique(perm_cols, False, False, True) count[uq + 1] = cnt indptr = np.cumsum(count) return rows, cols, perm_rows, indptr, perm, (system_size, system_size)
[docs] def J(self, fields, t=0): unks = [ fields[varname].values for varname in [unk.name for unk in self.system.unknowns] ] pars = [ fields[varname].values for varname in [par.name for par in self.system.parameters] ] coords = [ fields[varname].values for varname in [coord.name for coord in self.system.coordinates] ] sizes = [coord.size for coord in coords] data = self.compute_jacobian_values(t, unks, pars, coords, sizes) _, _, perm_rows, indptr, perm, shape = self.compute_jacobian_coordinates(*sizes) return csc_matrix((data[perm], perm_rows, indptr), shape=shape)