Source code for dpctl.tensor._indexing_functions

#                       Data Parallel Control (dpctl)
#
#  Copyright 2020-2024 Intel Corporation
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

import operator

import dpctl
import dpctl.tensor as dpt
import dpctl.tensor._tensor_impl as ti
import dpctl.utils

from ._copy_utils import (
    _extract_impl,
    _nonzero_impl,
    _put_multi_index,
    _take_multi_index,
)
from ._numpy_helper import normalize_axis_index


def _get_indexing_mode(name):
    modes = {"wrap": 0, "clip": 1}
    try:
        return modes[name]
    except KeyError:
        raise ValueError(
            "`mode` must be `wrap` or `clip`." "Got `{}`.".format(name)
        )


[docs]def take(x, indices, /, *, axis=None, mode="wrap"): """take(x, indices, axis=None, mode="wrap") Takes elements from an array along a given axis at given indices. Args: x (usm_ndarray): The array that elements will be taken from. indices (usm_ndarray): One-dimensional array of indices. axis (int, optional): The axis along which the values will be selected. If ``x`` is one-dimensional, this argument is optional. Default: ``None``. mode (str, optional): How out-of-bounds indices will be handled. Possible values are: - ``"wrap"``: clamps indices to (``-n <= i < n``), then wraps negative indices. - ``"clip"``: clips indices to (``0 <= i < n``). Default: ``"wrap"``. Returns: usm_ndarray: Array with shape ``x.shape[:axis] + indices.shape + x.shape[axis + 1:]`` filled with elements from ``x``. """ if not isinstance(x, dpt.usm_ndarray): raise TypeError( "Expected instance of `dpt.usm_ndarray`, got `{}`.".format(type(x)) ) if not isinstance(indices, dpt.usm_ndarray): raise TypeError( "`indices` expected `dpt.usm_ndarray`, got `{}`.".format( type(indices) ) ) if indices.dtype.kind not in "ui": raise IndexError( "`indices` expected integer data type, got `{}`".format( indices.dtype ) ) if indices.ndim != 1: raise ValueError( "`indices` expected a 1D array, got `{}`".format(indices.ndim) ) exec_q = dpctl.utils.get_execution_queue([x.sycl_queue, indices.sycl_queue]) if exec_q is None: raise dpctl.utils.ExecutionPlacementError res_usm_type = dpctl.utils.get_coerced_usm_type( [x.usm_type, indices.usm_type] ) mode = _get_indexing_mode(mode) x_ndim = x.ndim if axis is None: if x_ndim > 1: raise ValueError( "`axis` cannot be `None` for array of dimension `{}`".format( x_ndim ) ) axis = 0 if x_ndim > 0: axis = normalize_axis_index(operator.index(axis), x_ndim) x_sh = x.shape if x_sh[axis] == 0 and indices.size != 0: raise IndexError("cannot take non-empty indices from an empty axis") res_shape = x.shape[:axis] + indices.shape + x.shape[axis + 1 :] else: if axis != 0: raise ValueError("`axis` must be 0 for an array of dimension 0.") res_shape = indices.shape res = dpt.empty( res_shape, dtype=x.dtype, usm_type=res_usm_type, sycl_queue=exec_q ) _manager = dpctl.utils.SequentialOrderManager[exec_q] deps_ev = _manager.submitted_events hev, take_ev = ti._take( x, (indices,), res, axis, mode, sycl_queue=exec_q, depends=deps_ev ) _manager.add_event_pair(hev, take_ev) return res
[docs]def put(x, indices, vals, /, *, axis=None, mode="wrap"): """put(x, indices, vals, axis=None, mode="wrap") Puts values into an array along a given axis at given indices. Args: x (usm_ndarray): The array the values will be put into. indices (usm_ndarray): One-dimensional array of indices. vals (usm_ndarray): Array of values to be put into ``x``. Must be broadcastable to the result shape ``x.shape[:axis] + indices.shape + x.shape[axis+1:]``. axis (int, optional): The axis along which the values will be placed. If ``x`` is one-dimensional, this argument is optional. Default: ``None``. mode (str, optional): How out-of-bounds indices will be handled. Possible values are: - ``"wrap"``: clamps indices to (``-n <= i < n``), then wraps negative indices. - ``"clip"``: clips indices to (``0 <= i < n``). Default: ``"wrap"``. .. note:: If input array ``indices`` contains duplicates, a race condition occurs, and the value written into corresponding positions in ``x`` may vary from run to run. Preserving sequential semantics in handing the duplicates to achieve deterministic behavior requires additional work, e.g. :Example: .. code-block:: python from dpctl import tensor as dpt def put_vec_duplicates(vec, ind, vals): "Put values into vec, handling possible duplicates in ind" assert vec.ndim, ind.ndim, vals.ndim == 1, 1, 1 # find positions of last occurences of each # unique index ind_flipped = dpt.flip(ind) ind_uniq = dpt.unique_all(ind_flipped).indices has_dups = len(ind) != len(ind_uniq) if has_dups: ind_uniq = dpt.subtract(vec.size - 1, ind_uniq) ind = dpt.take(ind, ind_uniq) vals = dpt.take(vals, ind_uniq) dpt.put(vec, ind, vals) n = 512 ind = dpt.concat((dpt.arange(n), dpt.arange(n, -1, step=-1))) x = dpt.zeros(ind.size, dtype="int32") vals = dpt.arange(ind.size, dtype=x.dtype) # Values corresponding to last positions of # duplicate indices are written into the vector x put_vec_duplicates(x, ind, vals) parts = (vals[-1:-n-2:-1], dpt.zeros(n, dtype=x.dtype)) expected = dpt.concat(parts) assert dpt.all(x == expected) """ if not isinstance(x, dpt.usm_ndarray): raise TypeError( "Expected instance of `dpt.usm_ndarray`, got `{}`.".format(type(x)) ) if not isinstance(indices, dpt.usm_ndarray): raise TypeError( "`indices` expected `dpt.usm_ndarray`, got `{}`.".format( type(indices) ) ) if isinstance(vals, dpt.usm_ndarray): queues_ = [x.sycl_queue, indices.sycl_queue, vals.sycl_queue] usm_types_ = [x.usm_type, indices.usm_type, vals.usm_type] else: queues_ = [x.sycl_queue, indices.sycl_queue] usm_types_ = [x.usm_type, indices.usm_type] if indices.ndim != 1: raise ValueError( "`indices` expected a 1D array, got `{}`".format(indices.ndim) ) if indices.dtype.kind not in "ui": raise IndexError( "`indices` expected integer data type, got `{}`".format( indices.dtype ) ) exec_q = dpctl.utils.get_execution_queue(queues_) if exec_q is None: raise dpctl.utils.ExecutionPlacementError vals_usm_type = dpctl.utils.get_coerced_usm_type(usm_types_) mode = _get_indexing_mode(mode) x_ndim = x.ndim if axis is None: if x_ndim > 1: raise ValueError( "`axis` cannot be `None` for array of dimension `{}`".format( x_ndim ) ) axis = 0 if x_ndim > 0: axis = normalize_axis_index(operator.index(axis), x_ndim) x_sh = x.shape if x_sh[axis] == 0 and indices.size != 0: raise IndexError("cannot take non-empty indices from an empty axis") val_shape = x.shape[:axis] + indices.shape + x.shape[axis + 1 :] else: if axis != 0: raise ValueError("`axis` must be 0 for an array of dimension 0.") val_shape = indices.shape if not isinstance(vals, dpt.usm_ndarray): vals = dpt.asarray( vals, dtype=x.dtype, usm_type=vals_usm_type, sycl_queue=exec_q ) # choose to throw here for consistency with `place` if vals.size == 0: raise ValueError( "cannot put into non-empty indices along an empty axis" ) if vals.dtype == x.dtype: rhs = vals else: rhs = dpt.astype(vals, x.dtype) rhs = dpt.broadcast_to(rhs, val_shape) _manager = dpctl.utils.SequentialOrderManager[exec_q] deps_ev = _manager.submitted_events hev, put_ev = ti._put( x, (indices,), rhs, axis, mode, sycl_queue=exec_q, depends=deps_ev ) _manager.add_event_pair(hev, put_ev)
[docs]def extract(condition, arr): """extract(condition, arr) Returns the elements of an array that satisfies the condition. If ``condition`` is boolean ``dpctl.tensor.extract`` is equivalent to ``arr[condition]``. Note that ``dpctl.tensor.place`` does the opposite of ``dpctl.tensor.extract``. Args: conditions (usm_ndarray): An array whose non-zero or ``True`` entries indicate the element of ``arr`` to extract. arr (usm_ndarray): Input array of the same size as ``condition``. Returns: usm_ndarray: Rank 1 array of values from ``arr`` where ``condition`` is ``True``. """ if not isinstance(condition, dpt.usm_ndarray): raise TypeError( "Expecting dpctl.tensor.usm_ndarray type, " f"got {type(condition)}" ) if not isinstance(arr, dpt.usm_ndarray): raise TypeError( "Expecting dpctl.tensor.usm_ndarray type, " f"got {type(arr)}" ) exec_q = dpctl.utils.get_execution_queue( ( condition.sycl_queue, arr.sycl_queue, ) ) if exec_q is None: raise dpctl.utils.ExecutionPlacementError if condition.shape != arr.shape: raise ValueError("Arrays are not of the same size") return _extract_impl(arr, condition)
[docs]def place(arr, mask, vals): """place(arr, mask, vals) Change elements of an array based on conditional and input values. If ``mask`` is boolean ``dpctl.tensor.place`` is equivalent to ``arr[condition] = vals``. Args: arr (usm_ndarray): Array to put data into. mask (usm_ndarray): Boolean mask array. Must have the same size as ``arr``. vals (usm_ndarray, sequence): Values to put into ``arr``. Only the first N elements are used, where N is the number of True values in ``mask``. If ``vals`` is smaller than N, it will be repeated, and if elements of ``arr`` are to be masked, this sequence must be non-empty. Array ``vals`` must be one dimensional. """ if not isinstance(arr, dpt.usm_ndarray): raise TypeError( "Expecting dpctl.tensor.usm_ndarray type, " f"got {type(arr)}" ) if not isinstance(mask, dpt.usm_ndarray): raise TypeError( "Expecting dpctl.tensor.usm_ndarray type, " f"got {type(mask)}" ) if not isinstance(vals, dpt.usm_ndarray): raise TypeError( "Expecting dpctl.tensor.usm_ndarray type, " f"got {type(vals)}" ) exec_q = dpctl.utils.get_execution_queue( ( arr.sycl_queue, mask.sycl_queue, vals.sycl_queue, ) ) if exec_q is None: raise dpctl.utils.ExecutionPlacementError if arr.shape != mask.shape or vals.ndim != 1: raise ValueError("Array sizes are not as required") cumsum = dpt.empty(mask.size, dtype="i8", sycl_queue=exec_q) _manager = dpctl.utils.SequentialOrderManager[exec_q] deps_ev = _manager.submitted_events nz_count = ti.mask_positions( mask, cumsum, sycl_queue=exec_q, depends=deps_ev ) if nz_count == 0: return if vals.size == 0: raise ValueError("Cannot insert from an empty array!") if vals.dtype == arr.dtype: rhs = vals else: rhs = dpt.astype(vals, arr.dtype) hev, pl_ev = ti._place( dst=arr, cumsum=cumsum, axis_start=0, axis_end=mask.ndim, rhs=rhs, sycl_queue=exec_q, ) _manager.add_event_pair(hev, pl_ev)
[docs]def nonzero(arr): """nonzero(arr) Return the indices of non-zero elements. Returns a tuple of usm_ndarrays, one for each dimension of ``arr``, containing the indices of the non-zero elements in that dimension. The values of ``arr`` are always tested in row-major, C-style order. Args: arr (usm_ndarray): Input array, which has non-zero array rank. Returns: Tuple[usm_ndarray, ...]: Indices of non-zero array elements. """ if not isinstance(arr, dpt.usm_ndarray): raise TypeError( "Expecting dpctl.tensor.usm_ndarray type, " f"got {type(arr)}" ) if arr.ndim == 0: raise ValueError("Array of positive rank is expected") return _nonzero_impl(arr)
def _range(sh_i, i, nd, q, usm_t, dt): ind = dpt.arange(sh_i, dtype=dt, usm_type=usm_t, sycl_queue=q) ind.shape = tuple(sh_i if i == j else 1 for j in range(nd)) return ind
[docs]def take_along_axis(x, indices, /, *, axis=-1, mode="wrap"): """ Returns elements from an array at the one-dimensional indices specified by ``indices`` along a provided ``axis``. Args: x (usm_ndarray): input array. Must be compatible with ``indices``, except for the axis (dimension) specified by ``axis``. indices (usm_ndarray): array indices. Must have the same rank (i.e., number of dimensions) as ``x``. axis: int axis along which to select values. If ``axis`` is negative, the function determines the axis along which to select values by counting from the last dimension. Default: ``-1``. mode (str, optional): How out-of-bounds indices will be handled. Possible values are: - ``"wrap"``: clamps indices to (``-n <= i < n``), then wraps negative indices. - ``"clip"``: clips indices to (``0 <= i < n``). Default: ``"wrap"``. Returns: usm_ndarray: an array having the same data type as ``x``. The returned array has the same rank (i.e., number of dimensions) as ``x`` and a shape determined according to broadcasting rules, except for the axis (dimension) specified by ``axis`` whose size must equal the size of the corresponding axis (dimension) in ``indices``. Note: Treatment of the out-of-bound indices in ``indices`` array is controlled by the value of ``mode`` keyword. """ if not isinstance(x, dpt.usm_ndarray): raise TypeError(f"Expected dpctl.tensor.usm_ndarray, got {type(x)}") if not isinstance(indices, dpt.usm_ndarray): raise TypeError( f"Expected dpctl.tensor.usm_ndarray, got {type(indices)}" ) x_nd = x.ndim if x_nd != indices.ndim: raise ValueError( "Number of dimensions in the first and the second " "argument arrays must be equal" ) pp = normalize_axis_index(operator.index(axis), x_nd) out_usm_type = dpctl.utils.get_coerced_usm_type( (x.usm_type, indices.usm_type) ) exec_q = dpctl.utils.get_execution_queue((x.sycl_queue, indices.sycl_queue)) if exec_q is None: raise dpctl.utils.ExecutionPlacementError( "Execution placement can not be unambiguously inferred " "from input arguments. " ) mode_i = _get_indexing_mode(mode) indexes_dt = ti.default_device_index_type(exec_q.sycl_device) _ind = tuple( ( indices if i == pp else _range(x.shape[i], i, x_nd, exec_q, out_usm_type, indexes_dt) ) for i in range(x_nd) ) return _take_multi_index(x, _ind, 0, mode=mode_i)
[docs]def put_along_axis(x, indices, vals, /, *, axis=-1, mode="wrap"): """ Puts elements into an array at the one-dimensional indices specified by ``indices`` along a provided ``axis``. Args: x (usm_ndarray): input array. Must be compatible with ``indices``, except for the axis (dimension) specified by ``axis``. indices (usm_ndarray): array indices. Must have the same rank (i.e., number of dimensions) as ``x``. vals (usm_ndarray): Array of values to be put into ``x``. Must be broadcastable to the shape of ``indices``. axis: int axis along which to select values. If ``axis`` is negative, the function determines the axis along which to select values by counting from the last dimension. Default: ``-1``. mode (str, optional): How out-of-bounds indices will be handled. Possible values are: - ``"wrap"``: clamps indices to (``-n <= i < n``), then wraps negative indices. - ``"clip"``: clips indices to (``0 <= i < n``). Default: ``"wrap"``. .. note:: If input array ``indices`` contains duplicates, a race condition occurs, and the value written into corresponding positions in ``x`` may vary from run to run. Preserving sequential semantics in handing the duplicates to achieve deterministic behavior requires additional work. """ if not isinstance(x, dpt.usm_ndarray): raise TypeError(f"Expected dpctl.tensor.usm_ndarray, got {type(x)}") if not isinstance(indices, dpt.usm_ndarray): raise TypeError( f"Expected dpctl.tensor.usm_ndarray, got {type(indices)}" ) x_nd = x.ndim if x_nd != indices.ndim: raise ValueError( "Number of dimensions in the first and the second " "argument arrays must be equal" ) pp = normalize_axis_index(operator.index(axis), x_nd) if isinstance(vals, dpt.usm_ndarray): queues_ = [x.sycl_queue, indices.sycl_queue, vals.sycl_queue] usm_types_ = [x.usm_type, indices.usm_type, vals.usm_type] else: queues_ = [x.sycl_queue, indices.sycl_queue] usm_types_ = [x.usm_type, indices.usm_type] exec_q = dpctl.utils.get_execution_queue(queues_) if exec_q is None: raise dpctl.utils.ExecutionPlacementError( "Execution placement can not be unambiguously inferred " "from input arguments. " ) out_usm_type = dpctl.utils.get_coerced_usm_type(usm_types_) mode_i = _get_indexing_mode(mode) indexes_dt = ti.default_device_index_type(exec_q.sycl_device) _ind = tuple( ( indices if i == pp else _range(x.shape[i], i, x_nd, exec_q, out_usm_type, indexes_dt) ) for i in range(x_nd) ) return _put_multi_index(x, _ind, 0, vals, mode=mode_i)