Source code for skfdiff.core.backends.numpy_backend

#!/usr/bin/env python
# coding=utf-8

from functools import partial
from operator import attrgetter

import attr
import numpy as np
from loguru import logger
from scipy.sparse import csc_matrix
from sympy import lambdify, Symbol
from cachetools import cachedmethod

from .base_backend import Backend, register_backend


[docs]def np_depvar_printer(unk): return Symbol(str(unk.discrete))
[docs]def np_ivar_printer(coord): return (Symbol(str(coord.discrete)), Symbol(str(coord.idx)), coord.step, coord.N)
[docs]def np_Min(args): a, b = args return np.where(a < b, a, b)
[docs]def np_Max(args): a, b = args return np.where(a < b, b, a)
[docs]def np_Heaviside(a): return np.where(a < 0, 0, 1)
lambdify = partial( lambdify, modules=[{"amax": np_Max, "amin": np_Min, "Heaviside": np_Heaviside}, "scipy"], )
[docs]@register_backend @attr.s class NumpyBackend(Backend): name = "numpy" def __attrs_post_init__(self): logger.info("numpy backend: convert_inputs...") self._convert_inputs() logger.info("numpy backend: build_evolution_equations...") self._build_evolution_equations() logger.info("numpy backend: build_jacobian...") self._cache_jac_coord = {} self._build_jacobian() def _convert_inputs(self): self.ndim = len(self.system.coordinates) self.unks = list(map(np_depvar_printer, self.system.unknowns)) self.pars = list(map(np_depvar_printer, self.system.parameters)) self.coords, self.idxs, self.steps, self.sizes = zip( *map(np_ivar_printer, self.system.coordinates) ) self.idxs_map = [ tuple([self.system.coordinates.index(coord) for coord in unk.coordinates]) for unk in self.system.unknowns ] self.shapes = [self.system.shapes[unk] for unk in self.system.unknowns] self.inputs = [ self.system._t, *self.unks, *self.pars, *self.coords, *self.idxs, *self.steps, *self.sizes, ] self.inputs_cond = [*self.idxs, *self.sizes] def _build_evolution_equations(self): self.evolution_equations = [ lambdify(self.inputs, expr.n()) for expr in self.system.piecewise_system ]
[docs] def compute_F_vector(self, t, unks, pars, coords, sizes): subgrids = self.grid_builder.compute_subgrids(sizes) steps = self.grid_builder.compute_steps( tuple(sizes), tuple([coord.ptp() for coord in coords]) ) system_sizes = self.grid_builder.compute_sizes(sizes) system_size = sum(system_sizes) F = np.empty(system_size) for grid, eq in zip(subgrids, self.evolution_equations): Fi = eq(t, *unks, *pars, *coords, *grid[:, 1:-2].T, *steps, *sizes) F[grid[:, -1]] = Fi return F
[docs] def F(self, fields, t=0): unks = [ fields[varname].values for varname in [unk.name for unk in self.system.unknowns] ] pars = [ fields[varname].values for varname in [par.name for par in self.system.parameters] ] coords = [ fields[varname].values for varname in [coord.name for coord in self.system.coordinates] ] sizes = [coord.size for coord in coords] return self.compute_F_vector(t, unks, pars, coords, sizes)
def _build_jacobian(self): self.jacobian_routines = [ [lambdify(self.inputs, diff) for diff in jac_row] for jac_row in self.system.jacobian_values ] self.jacobian_columns = [ [lambdify(self.inputs_cond, col) for col in columns] for columns in self.system.jacobian_columns ]
[docs] def compute_jacobian_values(self, t, unks, pars, coords, sizes): subgrids = self.grid_builder.compute_subgrids(sizes) steps = self.grid_builder.compute_steps( tuple(sizes), tuple([coord.ptp() for coord in coords]) ) data_size = sum( [ subgrid.shape[0] * len(jacs) for subgrid, jacs in zip(subgrids, self.jacobian_columns) ] ) data = np.zeros(data_size) cursor = 0 for grid, jacs in zip(subgrids, self.jacobian_routines): for jac_func in jacs: next_cursor = cursor + grid.shape[0] jac = jac_func( t, *unks, *pars, *coords, *grid[:, 1:-2].T, *steps, *sizes ) data[cursor:next_cursor] = jac cursor = next_cursor return data
[docs] @cachedmethod(cache=attrgetter("_cache_jac_coord")) def compute_jacobian_coordinates(self, *sizes): subgrids = self.grid_builder.compute_subgrids(sizes) system_sizes = self.grid_builder.compute_sizes(sizes) system_size = sum(system_sizes) rows_list = [] cols_list = [] for grid, jac_cols in zip(subgrids, self.jacobian_columns): for col_func in jac_cols: cols_ = np.zeros((grid.shape[0], self.ndim + 1), dtype="int32") cols = col_func(*grid[:, 1:-2].T, *sizes) cols = np.stack( [np.broadcast_to(col, cols_.shape[:-1]) for col in cols] ) flat_cols = self.grid_builder.compute_flat_from_idxs(cols.T, sizes) rows_list.extend(grid[:, -1].reshape((-1,))) cols_list.extend(flat_cols.reshape((-1,))) rows = np.array(rows_list) cols = np.array(cols_list) perm = np.argsort(cols) perm_rows = rows[perm] perm_cols = cols[perm] count = np.zeros((system_size + 1), dtype="int32") uq, cnt = np.unique(perm_cols, False, False, True) count[uq + 1] = cnt indptr = np.cumsum(count) return rows, cols, perm_rows, indptr, perm, (system_size, system_size)
[docs] def J(self, fields, t=0): unks = [ fields[varname].values for varname in [unk.name for unk in self.system.unknowns] ] pars = [ fields[varname].values for varname in [par.name for par in self.system.parameters] ] coords = [ fields[varname].values for varname in [coord.name for coord in self.system.coordinates] ] sizes = [coord.size for coord in coords] data = self.compute_jacobian_values(t, unks, pars, coords, sizes) _, _, perm_rows, indptr, perm, shape = self.compute_jacobian_coordinates(*sizes) return csc_matrix((data[perm], perm_rows, indptr), shape=shape)