Source code for dpctl._sycl_timer

#                      Data Parallel Control (dpctl)
#
# Copyright 2020-2025 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import timeit

from . import SyclQueue

__doc__ = "This module implements :class:`dpctl.SyclTimer`."


class HostDeviceDuration:
    def __init__(self, host_dt, device_dt):
        self._host_dt = host_dt
        self._device_dt = device_dt

    def __repr__(self):
        return f"(host_dt={self._host_dt}, device_dt={self._device_dt})"

    def __str__(self):
        return f"(host_dt={self._host_dt}, device_dt={self._device_dt})"

    def __iter__(self):
        yield from [self._host_dt, self._device_dt]

    @property
    def host_dt(self):
        return self._host_dt

    @property
    def device_dt(self):
        return self._device_dt


class BaseDeviceTimer:
    __slots__ = ["queue"]

    def __init__(self, sycl_queue):
        if not isinstance(sycl_queue, SyclQueue):
            raise TypeError(f"Expected type SyclQueue, got {type(sycl_queue)}")
        self.queue = sycl_queue


class QueueBarrierDeviceTimer(BaseDeviceTimer):
    __slots__ = []

    def __init__(self, sycl_queue):
        super(QueueBarrierDeviceTimer, self).__init__(sycl_queue)

    def get_event(self):
        return self.queue.submit_barrier()


class OrderManagerDeviceTimer(BaseDeviceTimer):
    __slots__ = ["_order_manager", "_submit_empty_task_fn"]

    def __init__(self, sycl_queue):
        import dpctl.utils._seq_order_keeper as s_ok
        from dpctl.utils import SequentialOrderManager as seq_om

        super(OrderManagerDeviceTimer, self).__init__(sycl_queue)
        self._order_manager = seq_om[self.queue]
        self._submit_empty_task_fn = s_ok._submit_empty_task

    def get_event(self):
        ev = self._submit_empty_task_fn(
            sycl_queue=self.queue, depends=self._order_manager.submitted_events
        )
        self._order_manager.add_event_pair(ev, ev)
        return ev


[docs]class SyclTimer: """ Context to time execution of tasks submitted to :class:`dpctl.SyclQueue`. :Example: .. code-block:: python import dpctl # Create a default SyclQueue q = dpctl.SyclQueue(property="enable_profiling") # create the timer milliseconds_sc = 1e3 timer = dpctl.SyclTimer(time_scale = milliseconds_sc) untimed_code_block_1 # use the timer with timer(queue=q): timed_code_block1 untimed_code_block_2 # use the timer with timer(queue=q): timed_code_block2 untimed_code_block_3 # retrieve elapsed times in milliseconds wall_dt, device_dt = timer.dt .. note:: The timer submits tasks to the queue at the entrance and the exit of the context and uses profiling information from events associated with these submissions to perform the timing. Thus :class:`dpctl.SyclTimer` requires the queue with ``"enable_profiling"`` property. In order to be able to collect the profiling information, the ``dt`` property ensures that both tasks submitted by the timer complete their execution and thus effectively synchronizes the queue. Execution of the above example results in the following task graph, where each group of tasks is ordered after the one preceding it, ``[tasks_of_untimed_block1]``, ``[timer_fence_start_task]``, ``[tasks_of_timed_block1]``, ``[timer_fence_finish_task]``, ``[tasks_of_untimed_block2]``, ``[timer_fence_start_task]``, ``[tasks_of_timed_block2]``, ``[timer_fence_finish_task]``, ``[tasks_of_untimed_block3]``. ``device_timer`` keyword argument controls the type of tasks submitted. With ``"queue_barrier"`` value, queue barrier tasks are used. With ``"order_manager"`` value, a single empty body task is inserted and order manager (used by all `dpctl.tensor` operations) is used to order these tasks so that they fence operations performed within timer's context. Timing offloading operations that do not use the order manager with the timer that uses ``"order_manager"`` as ``device_timer`` value will be misleading becaused the tasks submitted by the timer will not be ordered with respect to tasks we intend to time. Note, that host timer effectively measures the time of task submissions. To measure host timer wall-time that includes execution of submitted tasks, make sure to include synchronization point in the timed block. :Example: .. code-block:: python with timer(q): timed_block q.wait() Args: host_timer (callable, optional): A callable such that host_timer() returns current host time in seconds. Default: :py:func:`timeit.default_timer`. device_timer (Literal["queue_barrier", "order_manager"], optional): Device timing method. Default: "queue_barrier". time_scale (Union[int, float], optional): Ratio of one second and the unit of time-scale of interest. Default: ``1``. """
[docs] def __init__( self, host_timer=timeit.default_timer, device_timer=None, time_scale=1 ): """ Create new instance of :class:`.SyclTimer`. Args: host_timer (callable, optional) A function that takes no arguments and returns a value measuring time. Default: :meth:`timeit.default_timer`. device_timer (Literal["queue_barrier", "order_manager"], optional): Device timing method. Default: "queue_barrier" time_scale (Union[int, float], optional): Scaling factor applied to durations measured by the host_timer. Default: ``1``. """ self.timer = host_timer self.time_scale = time_scale self.queue = None self.host_times = [] self.bracketing_events = [] self._context_data = list() if device_timer is None: device_timer = "queue_barrier" if device_timer == "queue_barrier": self._device_timer_class = QueueBarrierDeviceTimer elif device_timer == "order_manager": self._device_timer_class = OrderManagerDeviceTimer else: raise ValueError( "Supported values for device_timer keyword are " "'queue_barrier', 'order_manager', got " f"'{device_timer}'" ) self._device_timer = None
def __call__(self, queue=None): if isinstance(queue, SyclQueue): if queue.has_enable_profiling: self.queue = queue self._device_timer = self._device_timer_class(queue) else: raise ValueError( "The given queue was not created with the " "enable_profiling property" ) else: raise TypeError( "The passed queue must have type dpctl.SyclQueue, " f"got {type(queue)}" ) return self def __enter__(self): _event_start = self._device_timer.get_event() _host_start = self.timer() self._context_data.append((_event_start, _host_start)) return self def __exit__(self, *args): _event_end = self._device_timer.get_event() _host_end = self.timer() _event_start, _host_start = self._context_data.pop() self.host_times.append((_host_start, _host_end)) self.bracketing_events.append((_event_start, _event_end)) @property def dt(self): """Returns a pair of elapsed times ``host_dt`` and ``device_dt``. The ``host_dt`` is the duration as measured by the host timer, while the ``device_dt`` is the duration as measured by the device timer and encoded in profiling events. Returns: HostDeviceDuration: Data class with ``host_dt`` and ``device_dt`` members which supports unpacking into a 2-tuple. :Example: .. code-block:: python import dpctl from dpctl import tensor q = dpctl.SyclQueue(property="enable_profiling") device = tensor.Device.create_device(q) timer = dpctl.SyclTimer() with timer(q): x = tensor.linspace(-4, 4, num=10**6, dtype="float32") e = tensor.exp(-0.5 * tensor.square(x)) s = tensor.sin(2.3 * x + 0.11) f = e * s host_dt, device_dt = timer.dt .. note:: Since different timers are used to measure host and device durations, one should not expect that ``host_dt`` is always strictly greater than ``device_dt``. Use tracing tools like ``onetrace``, or ``unitrace`` from `intel/pti-gpu <https://github.com/intel/pti-gpu>`_ repository for more accurate measurements. """ for es, ef in self.bracketing_events: es.wait() ef.wait() host_dt = sum(tf - ts for ts, tf in self.host_times) * self.time_scale dev_dt = sum( ef.profiling_info_start - es.profiling_info_end for es, ef in self.bracketing_events ) * (1e-9 * self.time_scale) return HostDeviceDuration(host_dt, dev_dt)