Source code for mckit_meshes.utils.rebin

"""Functions for rebinning histogram-like distributions."""

# TODO @dvp: implement propagation in result the indexes computed on shrink
#            for reuse in FMesh.shrink for equivalent grids or alike
from __future__ import annotations

from typing import TYPE_CHECKING, Any

import collections.abc
import gc
import itertools

from multiprocessing import Pool

import numpy as np

if TYPE_CHECKING:
    from collections.abc import Iterable, Sequence

    from numpy.typing import ArrayLike, NDArray

    ArrayFloat = NDArray[Any, float]


__all__ = [
    "interpolate",
    "is_monotonically_increasing",
    "rebin_1d",
    "rebin_nd",
    "rebin_spec_composer",
    "shrink_1d",
    "shrink_nd",
    "trim_spec_composer",
]

__revision__ = "$Id$"

__ZERO = np.array([0.0], dtype=float)
__EXTERNAL_PROCESS_THRESHOLD = 1000000


# noinspection PyTypeChecker
[docs] def is_monotonically_increasing(a: NDArray) -> bool: # noinspection PyUnresolvedReferences if not a.size: return False iterator = iter(a) prev = next(iterator) for val in iterator: if prev < val: prev = val else: return False return True
# noinspection PyUnresolvedReferences def set_axis(indices: ArrayLike, axis: int, a_shape: Sequence[int]) -> ArrayLike: shape = [1] * len(a_shape) shape[axis] = a_shape[axis] return indices.reshape(tuple(shape)) # noinspection PyUnresolvedReferences,PyTypeChecker
[docs] def interpolate( x_new: ArrayFloat, x: ArrayFloat, y: ArrayFloat, axis: int | None = None ) -> ArrayFloat: if y.ndim == 1: return np.interp(x_new, x, y) if axis is None: axis = 0 x_new_indices = np.digitize(x_new, x) x_new_indices = x_new_indices.clip(1, len(x) - 1).astype(int) # TODO @dvp: why astype? lo = x_new_indices - 1 hi = x_new_indices x_lo = x[lo] deltas = x[hi] - x_lo nd = y.ndim slice1 = [slice(None)] * nd # TODO @dvp: suspicious usage of slice duplicates slice2 = [slice(None)] * nd slice1[axis] = lo slice2[axis] = hi slice1 = tuple(slice1) slice2 = tuple(slice2) y_lo = y[slice1] y_deltas = y[slice2] - y_lo deltas = set_axis(deltas, axis, y_deltas.shape) slope = y_deltas / deltas new_deltas = x_new - x_lo new_deltas = set_axis(new_deltas, axis, slope.shape) return slope * new_deltas + y_lo
# noinspection PyUnresolvedReferences
[docs] def rebin_1d( a: np.ndarray, bins: np.ndarray, new_bins: np.ndarray, axis: int = 0, *, grouped: bool = False, assume_sorted: bool = False, ) -> np.ndarray: """Transforms 1-D histogram defined as `data` on the limiting points. define like `bins` to equivalent (see the terms below) histogram defined on other limiting points defined as `new_bins`. Notes: The algorithm maintains the equality of integral on intervals defined on new_bins for the original and rebinned distributions. Args: a: The array to rebin bins: Defines 1-D array representing `a` binning along the given `axis new_bins: The new binning required. axis: int, optional An axis along which to rebin array `a` grouped: bool, optional Defines the approach for rebinning. - If `True`, then the values in `a` represent the data already integrated over bins, like in energy group distributions. On rebinning maintain equivalence of integral over same energy range in old and new bins. - If `False` (default), as for spatial binning - maintain the same average value in the same volume in old and new bins. assume_sorted: bool, optional If True, then skip assertion of bins sorting order, by default False - asserts the input_file data Returns: rebinned data """ assert bins[0] <= new_bins[0], ( "Rebinning doesn't provide extrapolation lower of the original bins" ) assert new_bins[-1] <= bins[-1], ( "Rebinning doesn't provide extrapolation upper of the original bins" ) assert bins.size == a.shape[axis] + 1, ( "The `a` array shape doesn't match the given bins and axis" ) assert assume_sorted or is_monotonically_increasing(bins) assert assume_sorted or is_monotonically_increasing(new_bins) ndim = a.ndim if grouped: t = a else: diff = np.diff(bins) if ndim > 1: diffs_shape = [1] * ndim diffs_shape[axis] = a.shape[axis] diffs_shape = tuple(diffs_shape) diff = diff.reshape(diffs_shape) t = a * diff cum = np.cumsum(t, axis=axis) cum = np.insert(cum, 0, __ZERO, axis=axis) rebinned_data = interpolate(new_bins, bins, cum, axis=axis) rebinned_data = np.diff(rebinned_data, axis=axis) # del cum if a.size > __EXTERNAL_PROCESS_THRESHOLD: gc.collect() del gc.garbage[:] if not grouped: new_diff = np.diff(new_bins) if ndim > 1: diffs_shape = [1] * ndim diffs_shape[axis] = rebinned_data.shape[axis] new_diff = new_diff.reshape(tuple(diffs_shape)) rebinned_data /= new_diff return rebinned_data
[docs] def rebin_nd( a: NDArray[float], rebin_spec: Iterable[tuple[NDArray[float], NDArray[float], int, bool]], *, assume_sorted: bool = False, external_process_threshold: int = __EXTERNAL_PROCESS_THRESHOLD, ) -> NDArray[float]: """Rebin an array `a` over multidimensional grid. Args: a: An array to rebin. rebin_spec: Iterator An iterator listing tuples specifying bins, new_bins, axis and grouped parameters for rebinning. See :py:func:`rebin_1d` for details on the parameters. assume_sorted: bool, optional If True skip assertion of bins sorting order, by default False - asserts the input_file data external_process_threshold: int If size of `a` is greater than that, then the computation is executed in external process, to achieve immediate memory cleanup. Returns: Rebinned data. """ if not isinstance(rebin_spec, collections.abc.Iterator): rebin_spec = iter(rebin_spec) try: bins, new_bins, axis, grouped = next(rebin_spec) except StopIteration: return a if a.size > external_process_threshold: with Pool(processes=1) as pool: recursion_res = pool.apply( rebin_nd, args=(a, rebin_spec), kwds={"assume_sorted": assume_sorted} ) else: recursion_res = rebin_nd(a, rebin_spec, assume_sorted=assume_sorted) res = rebin_1d( recursion_res, bins, new_bins, axis, grouped=grouped, assume_sorted=assume_sorted ) del recursion_res if a.size > external_process_threshold: n = gc.collect() if n: del gc.garbage[:] return res
[docs] def rebin_spec_composer( bins_seq, new_bins_seq, axes=None, grouped_flags=None, ) -> Iterable[tuple[NDArray[float], NDArray[float], NDArray[float], NDArray[float]]]: """Compose rebin_spec parameter. See also :py:func:`mckit_meshes.utils.rebin.rebin_nd` with reasonable defaults for axes and grouped iterators. Args: bins_seq: sequence of ndarrays Iterates over the list of original bins new_bins_seq: sequence of ndarrays Iterates over the list of new bins. axes: sequence of ints, optional Iterates over the list of corresponding axes. If not provided (default), then iterates over sequence 0 ... len(bins). grouped_flags: sequence of booleans, optional Iterates over a sequence of grouped flags. If not provided (default), then all the axes considered as not grouped. If constant boolean value is proved, then for all the axes this value is applied. Returns: Iterator over the sequence of tuples (bins, new_bins, axis, grouped) """ if not axes: axes = itertools.count() if isinstance(grouped_flags, bool): grouped_flags = itertools.repeat(grouped_flags) elif not grouped_flags: grouped_flags = itertools.repeat(False) return zip(bins_seq, new_bins_seq, axes, grouped_flags, strict=False)
# @numba.jit
[docs] def shrink_1d( a: np.ndarray, bins: np.ndarray, low: float | None = None, high: float | None = None, axis: int | None = None, *, assume_sorted: bool = False, ) -> tuple[np.ndarray, np.ndarray]: """Select sub-arrays of a `a` and corresponding `bins` for minimal span. of bins, which completely covers the range [`low`...`high`] both sides included. Args: a: ndarray An array to shrink. bins: ndarray Bins corresponding to the grid `a` over the given `axis`. low: float, optional Left edge of the range to shrink to. When omitted, the `bins` left edge is used. high: float Right edge of the range to shrink to. When omitted, the `bins` right edge is used. axis: int, optional An axis of `a` over which to shrink. Default axis = 0. assume_sorted: bool, optional If True skip assertion of bins sorting order, by default False - asserts the input_file data Returns: new_bins: ndarray The shrank bins new_data: ndarray The shrank grid """ if low is None and high is None: return bins, a if axis is None: axis = 0 assert a.shape[axis] == bins.size - 1 assert assume_sorted or is_monotonically_increasing(bins) if low is None: low = bins[0] if high is None: high = bins[-1] if low == bins[0] and high == bins[-1]: return bins, a if low < bins[0] or bins[-1] < low: msg = ( f"Low shrink edge is beyond the bins range: {low:g}" f" is not in [{bins[0]:g}..{bins[-1]:g}]", ) raise ValueError(msg) if high < bins[0] or bins[-1] < high: msg = f"High shrink edge is beyond the bins range: {high} is not in [{bins[0]}..{bins[-1]}]" raise ValueError(msg) left_idx, right_idx = np.digitize([low, high], bins) - 1 if left_idx > 0 and bins[left_idx] > low: left_idx -= 1 if right_idx < bins.size - 1 and bins[right_idx] < high: right_idx += 1 if right_idx - left_idx < 1: raise ValueError("Shrink results to empty grid") if left_idx == 0 and right_idx == bins.size: return bins, a indices = list(range(left_idx, right_idx + 1)) new_bins = np.take(bins, indices) new_a = np.take(a, indices[:-1], axis=axis) return new_bins, new_a
[docs] def shrink_nd( a: np.ndarray, trim_spec: Iterable[tuple[np.ndarray, float, float, int]], *, assume_sorted: bool = False, ) -> tuple[list[np.ndarray] | None, np.ndarray]: """Perform multidimensional shrink. Args: a: The grid to shrink. trim_spec: sequence of tuples (bins, low, high, axis) assume_sorted: If True skip assertion of bins sorting order, by default False - asserts the input_file data Returns: A sequence with new bins, if any, the shrunk or initial grid. """ if not isinstance(trim_spec, collections.abc.Iterator): trim_spec = iter(trim_spec) try: bins, left, right, axis = next(trim_spec) except StopIteration: return None, a new_bins_seq, recursed_data = shrink_nd(a, trim_spec, assume_sorted=assume_sorted) top_bins, top_data = shrink_1d( recursed_data, bins, left, right, axis, assume_sorted=assume_sorted ) new_bins_seq = [top_bins, *new_bins_seq] if new_bins_seq else [top_bins] return new_bins_seq, top_data
[docs] def trim_spec_composer( bins_seq, lefts=None, rights=None, axes=None, ) -> Iterable[tuple[NDArray[float], float, float, int]]: """Helps to compose trim_spec parameter in. :func:`mckit_meshes.utils.rebin.trim_nd` with reasonable defaults for lefts, rights and axes iterators. Args: bins_seq: sequence of ndarrays Iterates over the list of bins associated with a grid to be trimmed. lefts: sequence of floats Iterates over the list of left edges for trimming. rights: sequence of floats Iterates over the list of right edges for trimming. axes: sequence of ints, optional Iterates over the list of corresponding axes. If not provided (default), then iterates over sequence 0..len(bins). Returns: Iterator over the sequence of tuples (bins, lefts, rights, axis) """ if not lefts: lefts = itertools.repeat(None) if not rights: rights = itertools.repeat(None) if not axes: axes = itertools.count() return zip(bins_seq, lefts, rights, axes, strict=False)