diff --git a/c/parallel/src/transform.cu b/c/parallel/src/transform.cu index 1c819855fdc..2fba09504b7 100644 --- a/c/parallel/src/transform.cu +++ b/c/parallel/src/transform.cu @@ -125,6 +125,9 @@ struct transform_kernel_source cub::detail::transform::cuda_expected CacheAsyncConfiguration(const ActionT& action) { +#if defined(CCCL_PYTHON_FREE_THREADED) + return action(); +#else // defined(CCCL_PYTHON_FREE_THREADED) auto cache = reinterpret_cast(build.cache); if (cache == nullptr) { @@ -135,12 +138,16 @@ struct transform_kernel_source cache->async_config = action(); } return *cache->async_config; +#endif // defined(CCCL_PYTHON_FREE_THREADED) } template cub::detail::transform::cuda_expected CachePrefetchConfiguration(const ActionT& action) { +#if defined(CCCL_PYTHON_FREE_THREADED) + return action(); +#else // defined(CCCL_PYTHON_FREE_THREADED) auto cache = reinterpret_cast(build.cache); if (cache == nullptr) { @@ -151,6 +158,7 @@ struct transform_kernel_source cache->prefetch_config = action(); } return *cache->prefetch_config; +#endif // defined(CCCL_PYTHON_FREE_THREADED) } CUkernel TransformKernel() const @@ -346,7 +354,9 @@ static_assert(device_transform_policy()(detail::current_tuning_cc()) == {9}, "Ho return CUDA_ERROR_OUT_OF_MEMORY; } std::memcpy(runtime_policy.get(), &policy_sel, sizeof(policy_sel)); - auto cache_obj = std::make_unique(); +#if !defined(CCCL_PYTHON_FREE_THREADED) + auto cache_obj = std::make_unique(); +#endif // !defined(CCCL_PYTHON_FREE_THREADED) auto kernel_name_copy = std::unique_ptr(duplicate_c_string(kernel_lowered_name)); build_ptr->loaded_bytes_per_iteration = static_cast(input_it.value_type.size); @@ -372,7 +382,11 @@ static_assert(device_transform_policy()(detail::current_tuning_cc()) == {9}, "Ho build_ptr->payload_kind = CCCL_PAYLOAD_CUBIN; } - build_ptr->cache = cache_obj.release(); +#if defined(CCCL_PYTHON_FREE_THREADED) + build_ptr->cache = nullptr; +#else // defined(CCCL_PYTHON_FREE_THREADED) + build_ptr->cache = cache_obj.release(); +#endif // defined(CCCL_PYTHON_FREE_THREADED) build_ptr->transform_kernel_lowered_name = kernel_name_copy.release(); build_ptr->runtime_policy = runtime_policy.release(); build_ptr->runtime_policy_size = sizeof(policy_sel); @@ -642,7 +656,9 @@ static_assert(device_transform_policy()(detail::current_tuning_cc()) == {12}, "H return CUDA_ERROR_OUT_OF_MEMORY; } std::memcpy(runtime_policy.get(), &policy_sel, sizeof(policy_sel)); - auto cache_obj = std::make_unique(); +#if !defined(CCCL_PYTHON_FREE_THREADED) + auto cache_obj = std::make_unique(); +#endif // !defined(CCCL_PYTHON_FREE_THREADED) auto kernel_name_copy = std::unique_ptr(duplicate_c_string(kernel_lowered_name)); build_ptr->loaded_bytes_per_iteration = static_cast((input1_it.value_type.size + input2_it.value_type.size)); @@ -668,7 +684,11 @@ static_assert(device_transform_policy()(detail::current_tuning_cc()) == {12}, "H build_ptr->payload_kind = CCCL_PAYLOAD_CUBIN; } - build_ptr->cache = cache_obj.release(); +#if defined(CCCL_PYTHON_FREE_THREADED) + build_ptr->cache = nullptr; +#else // defined(CCCL_PYTHON_FREE_THREADED) + build_ptr->cache = cache_obj.release(); +#endif // defined(CCCL_PYTHON_FREE_THREADED) build_ptr->transform_kernel_lowered_name = kernel_name_copy.release(); build_ptr->runtime_policy = runtime_policy.release(); build_ptr->runtime_policy_size = sizeof(policy_sel); diff --git a/ci/matrix.yaml b/ci/matrix.yaml index a4d8d8405bb..509684610b8 100644 --- a/ci/matrix.yaml +++ b/ci/matrix.yaml @@ -87,8 +87,10 @@ workflows: - {jobs: ['test'], project: 'cccl_c_stf', ctk: '13.X', cxx: 'gcc13', gpu: ['rtx2080', 'l4', 'h100']} # Python -- pinned to gcc13 / msvc2022 for consistency across CTK images - {jobs: ['test'], project: 'python', ctk: ['12.X', '13.X'], py_version: ['3.10'], gpu: 'l4', cxx: ['gcc13', 'msvc2022']} - - {jobs: ['test'], project: 'python', ctk: ['12.X','13.0', '13.X'], py_version: ['3.14'], gpu: 'l4', cxx: ['gcc13', 'msvc2022']} - - {jobs: ['test'], project: 'python', py_version: '3.14', gpu: 'h100', cxx: 'gcc13'} + - {jobs: ['test'], project: 'python', ctk: ['12.X','13.0', '13.X'], py_version: ['3.14', '3.14t'], gpu: 'l4', cxx: ['gcc13', 'msvc2022']} + - {jobs: ['test'], project: 'python', py_version: ['3.14', '3.14t'], gpu: 'h100', cxx: 'gcc13'} + - {jobs: ['test_py_compute_minimal'], project: 'python', ctk: '13.X', py_version: '3.14', gpu: 'l4', cxx: 'gcc13'} + - {jobs: ['test_py_compute_minimal'], project: 'python', ctk: '13.X', py_version: '3.14t', gpu: 'l4', cxx: 'gcc13'} # CCCL packaging: - {jobs: ['test'], project: 'packaging', ctk: '12.0', cxx: ['gcc10', 'clang14'], gpu: 'rtx2080', args: '-min-cmake'} - {jobs: ['test'], project: 'packaging', ctk: '12.X', cxx: ['gcc10', 'clang14'], gpu: 'rtx2080'} @@ -544,6 +546,7 @@ jobs: test_py_headers: { name: "Test cuda.cccl.headers", gpu: true, needs: 'build_py_wheel', force_producer_ctk: "pybuild", invoke: { prefix: 'test_cuda_cccl_headers'} } test_py_coop: { name: "Test cuda.coop._experimental", gpu: true, needs: 'build_py_wheel', force_producer_ctk: "pybuild", invoke: { prefix: 'test_cuda_coop'} } test_py_par: { name: "Test cuda.compute", gpu: true, needs: 'build_py_wheel', force_producer_ctk: "pybuild", invoke: { prefix: 'test_cuda_compute'} } + test_py_compute_minimal: { name: "Test cuda.compute minimal", gpu: true, needs: 'build_py_wheel', force_producer_ctk: "pybuild", invoke: { prefix: 'test_cuda_compute_minimal'} } test_py_examples: { name: "Test cuda.cccl.examples", gpu: true, needs: 'build_py_wheel', force_producer_ctk: "pybuild", invoke: { prefix: 'test_cuda_cccl_examples'} } # Run jobs for 'target' project (ci/util/build_and_test_targets.sh): diff --git a/ci/test_cuda_compute_minimal_python.sh b/ci/test_cuda_compute_minimal_python.sh new file mode 100755 index 00000000000..a43bbac7bfa --- /dev/null +++ b/ci/test_cuda_compute_minimal_python.sh @@ -0,0 +1,36 @@ +#!/usr/bin/env bash + +set -euo pipefail + +ci_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +repo_root="$(cd "$ci_dir/.." && pwd)" +source "$ci_dir/pyenv_helper.sh" + +# Parse common arguments +source "$ci_dir/util/python/common_arg_parser.sh" +parse_python_args "$@" +require_py_version "Usage: $0 -py-version " + +cuda_major_version=$(nvcc --version | grep release | awk '{print $6}' | tr -d ',' | cut -d '.' -f 1 | cut -d 'V' -f 2) + +# Setup Python environment +setup_python_env "${py_version}" + +# Fetch or build the cuda_cccl wheel: +if [[ -n "${GITHUB_ACTIONS:-}" ]]; then + wheel_artifact_name=$("$ci_dir/util/workflow/get_wheel_artifact_name.sh") + "$ci_dir/util/artifacts/download.sh" "${wheel_artifact_name}" "${repo_root}/" + wheelhouse_dir="${repo_root}/wheelhouse" +else + "$ci_dir/build_cuda_cccl_python.sh" -py-version "${py_version}" + wheelhouse_dir="${repo_root}/wheelhouse" +fi + +# Install cuda_cccl with the minimal CUDA extra. This intentionally avoids the +# full cu* extras because those pull in numba/numba-cuda. +CUDA_CCCL_WHEEL_PATH="$(ls "${wheelhouse_dir}"/cuda_cccl-*.whl)" +python -m pip install "${CUDA_CCCL_WHEEL_PATH}[minimal-cu${cuda_major_version}]" +python -m pip install pytest pytest-xdist "cupy-cuda${cuda_major_version}x" + +cd "${repo_root}/python/cuda_cccl/tests/" +python -m pytest -n 6 -v compute/test_no_numba.py diff --git a/ci/test_cuda_compute_python.sh b/ci/test_cuda_compute_python.sh index bd6ad432178..c829e72518a 100755 --- a/ci/test_cuda_compute_python.sh +++ b/ci/test_cuda_compute_python.sh @@ -35,5 +35,5 @@ if [[ "${CCCL_PYTHON_USE_V2:-}" =~ ^(1|true|TRUE|on|ON)$ ]]; then fi cd "/home/coder/cccl/python/cuda_cccl/tests/" -python -m pytest "${pytest_extra[@]}" -n 6 -v compute/ -m "not large" -python -m pytest "${pytest_extra[@]}" -n 0 -v compute/ -m "large" +python -m pytest "${pytest_extra[@]}" -n 6 -v compute/ -m "not large and not free_threading" +python -m pytest "${pytest_extra[@]}" -n 0 -v compute/ -m "large and not free_threading" diff --git a/ci/windows/build_common_python.psm1 b/ci/windows/build_common_python.psm1 index dab9258761a..060da49d4bd 100644 --- a/ci/windows/build_common_python.psm1 +++ b/ci/windows/build_common_python.psm1 @@ -4,12 +4,13 @@ function Get-Python { Returns the path of the Python interpreter satisfying the supplied version, installing it via uv if necessary. .PARAMETER Version - A string in the form 'M.m' (e.g., '3.10', '3.13'). + A string in the form 'M.m' (e.g., '3.10', '3.13') or a free-threaded + version such as '3.14t'. #> [CmdletBinding()] param( [Parameter(Mandatory, Position = 0)] - [ValidatePattern('^\d+\.\d+$')] + [ValidatePattern('^\d+\.\d+t?$')] [string]$Version ) diff --git a/ci/windows/build_cuda_cccl_python.ps1 b/ci/windows/build_cuda_cccl_python.ps1 index 3d6dadd632a..3e825d38b9c 100644 --- a/ci/windows/build_cuda_cccl_python.ps1 +++ b/ci/windows/build_cuda_cccl_python.ps1 @@ -21,7 +21,8 @@ .PARAMETER PyVersion **Required.** The Python version to use for building the wheel, expressed - as `.` (e.g. `3.11`). + as `.` (e.g. `3.11`) or a free-threaded version such as + `3.14t`. .PARAMETER OnlyCudaMajor Optional. Restricts the build to a single CUDA major version (`12` or `13`). @@ -49,7 +50,7 @@ Param( [Parameter(Mandatory = $true)] [Alias("py-version")] - [ValidatePattern("^\d+\.\d+$")] + [ValidatePattern("^\d+\.\d+t?$")] [string]$PyVersion, [Parameter(Mandatory = $false)] diff --git a/ci/windows/test_cuda_cccl_examples_python.ps1 b/ci/windows/test_cuda_cccl_examples_python.ps1 index 0c108328822..8e50dc5d7bf 100644 --- a/ci/windows/test_cuda_cccl_examples_python.ps1 +++ b/ci/windows/test_cuda_cccl_examples_python.ps1 @@ -1,27 +1,27 @@ -Param( - [Parameter(Mandatory = $true)] - [Alias("py-version")] - [ValidatePattern("^\d+\.\d+$")] - [string]$PyVersion -) - -$ErrorActionPreference = "Stop" - -# Import shared helpers -Import-Module "$PSScriptRoot/build_common.psm1" -Import-Module "$PSScriptRoot/build_common_python.psm1" - -$python = Get-Python -Version $PyVersion -$cudaMajor = Get-CudaMajor - -$repoRoot = Get-RepoRoot - -${wheelPath} = Get-CudaCcclWheel -& $python -m pip install -U pip pytest pytest-xdist -& $python -m pip install "${wheelPath}[test-cu$cudaMajor]" - -Push-Location (Join-Path $repoRoot "python/cuda_cccl/tests") -try { - & $python -m pytest -n 6 test_examples.py -} -finally { Pop-Location } +Param( + [Parameter(Mandatory = $true)] + [Alias("py-version")] + [ValidatePattern("^\d+\.\d+t?$")] + [string]$PyVersion +) + +$ErrorActionPreference = "Stop" + +# Import shared helpers +Import-Module "$PSScriptRoot/build_common.psm1" +Import-Module "$PSScriptRoot/build_common_python.psm1" + +$python = Get-Python -Version $PyVersion +$cudaMajor = Get-CudaMajor + +$repoRoot = Get-RepoRoot + +${wheelPath} = Get-CudaCcclWheel +& $python -m pip install -U pip pytest pytest-xdist +& $python -m pip install "${wheelPath}[test-cu$cudaMajor]" + +Push-Location (Join-Path $repoRoot "python/cuda_cccl/tests") +try { + & $python -m pytest -n 6 test_examples.py +} +finally { Pop-Location } diff --git a/ci/windows/test_cuda_cccl_headers_python.ps1 b/ci/windows/test_cuda_cccl_headers_python.ps1 index 04a6adacc28..3a2fd40d51b 100644 --- a/ci/windows/test_cuda_cccl_headers_python.ps1 +++ b/ci/windows/test_cuda_cccl_headers_python.ps1 @@ -1,27 +1,27 @@ -Param( - [Parameter(Mandatory = $true)] - [Alias("py-version")] - [ValidatePattern("^\d+\.\d+$")] - [string]$PyVersion -) - -$ErrorActionPreference = "Stop" - -# Import shared helpers -Import-Module "$PSScriptRoot/build_common.psm1" -Import-Module "$PSScriptRoot/build_common_python.psm1" - -$python = Get-Python -Version $PyVersion -$cudaMajor = Get-CudaMajor - -$repoRoot = Get-RepoRoot - -${wheelPath} = Get-CudaCcclWheel -& $python -m pip install -U pip pytest pytest-xdist -& $python -m pip install "${wheelPath}[test-cu$cudaMajor]" - -Push-Location (Join-Path $repoRoot "python/cuda_cccl/tests") -try { - & $python -m pytest -n auto -v headers/ -} -finally { Pop-Location } +Param( + [Parameter(Mandatory = $true)] + [Alias("py-version")] + [ValidatePattern("^\d+\.\d+t?$")] + [string]$PyVersion +) + +$ErrorActionPreference = "Stop" + +# Import shared helpers +Import-Module "$PSScriptRoot/build_common.psm1" +Import-Module "$PSScriptRoot/build_common_python.psm1" + +$python = Get-Python -Version $PyVersion +$cudaMajor = Get-CudaMajor + +$repoRoot = Get-RepoRoot + +${wheelPath} = Get-CudaCcclWheel +& $python -m pip install -U pip pytest pytest-xdist +& $python -m pip install "${wheelPath}[test-cu$cudaMajor]" + +Push-Location (Join-Path $repoRoot "python/cuda_cccl/tests") +try { + & $python -m pytest -n auto -v headers/ +} +finally { Pop-Location } diff --git a/ci/windows/test_cuda_compute_python.ps1 b/ci/windows/test_cuda_compute_python.ps1 index 796d5128141..f8a9f2f7509 100644 --- a/ci/windows/test_cuda_compute_python.ps1 +++ b/ci/windows/test_cuda_compute_python.ps1 @@ -1,29 +1,29 @@ -Param( - [Parameter(Mandatory = $true)] - [Alias("py-version")] - [ValidatePattern("^\d+\.\d+$")] - [string]$PyVersion -) - -$ErrorActionPreference = "Stop" - -# Import shared helpers -Import-Module "$PSScriptRoot/build_common.psm1" -Import-Module "$PSScriptRoot/build_common_python.psm1" - -$python = Get-Python -Version $PyVersion -$cudaMajor = Get-CudaMajor - -$repoRoot = Get-RepoRoot - -$wheelPath = Get-CudaCcclWheel - -& $python -m pip install -U pip pytest pytest-xdist -& $python -m pip install "$wheelPath[test-cu$cudaMajor]" - -Push-Location (Join-Path $repoRoot "python/cuda_cccl/tests") -try { - & $python -m pytest -n 6 -v compute/ -m "not large" - & $python -m pytest -n 0 -v compute/ -m "large" -} -finally { Pop-Location } +Param( + [Parameter(Mandatory = $true)] + [Alias("py-version")] + [ValidatePattern("^\d+\.\d+t?$")] + [string]$PyVersion +) + +$ErrorActionPreference = "Stop" + +# Import shared helpers +Import-Module "$PSScriptRoot/build_common.psm1" +Import-Module "$PSScriptRoot/build_common_python.psm1" + +$python = Get-Python -Version $PyVersion +$cudaMajor = Get-CudaMajor + +$repoRoot = Get-RepoRoot + +$wheelPath = Get-CudaCcclWheel + +& $python -m pip install -U pip pytest pytest-xdist +& $python -m pip install "$wheelPath[test-cu$cudaMajor]" + +Push-Location (Join-Path $repoRoot "python/cuda_cccl/tests") +try { + & $python -m pytest -n 6 -v compute/ -m "not large and not free_threading" + & $python -m pytest -n 0 -v compute/ -m "large and not free_threading" +} +finally { Pop-Location } diff --git a/ci/windows/test_cuda_coop_python.ps1 b/ci/windows/test_cuda_coop_python.ps1 index 7fb5f9628bc..b0168ffcb32 100644 --- a/ci/windows/test_cuda_coop_python.ps1 +++ b/ci/windows/test_cuda_coop_python.ps1 @@ -1,25 +1,25 @@ -Param( - [Parameter(Mandatory = $true)] - [Alias("py-version")] - [ValidatePattern("^\d+\.\d+$")] - [string]$PyVersion -) - -$ErrorActionPreference = "Stop" - -# Import shared helpers -Import-Module "$PSScriptRoot/build_common.psm1" -Import-Module "$PSScriptRoot/build_common_python.psm1" - -$python = Get-Python -Version $PyVersion -$cudaMajor = Get-CudaMajor - -${wheelPath} = Get-CudaCcclWheel -& $python -m pip install -U pip pytest pytest-xdist -& $python -m pip install "${wheelPath}[test-cu$cudaMajor]" - -Push-Location (Join-Path (Get-RepoRoot) "python/cuda_cccl/tests") -try { - & $python -m pytest -n auto -v coop/_experimental/ -} -finally { Pop-Location } +Param( + [Parameter(Mandatory = $true)] + [Alias("py-version")] + [ValidatePattern("^\d+\.\d+t?$")] + [string]$PyVersion +) + +$ErrorActionPreference = "Stop" + +# Import shared helpers +Import-Module "$PSScriptRoot/build_common.psm1" +Import-Module "$PSScriptRoot/build_common_python.psm1" + +$python = Get-Python -Version $PyVersion +$cudaMajor = Get-CudaMajor + +${wheelPath} = Get-CudaCcclWheel +& $python -m pip install -U pip pytest pytest-xdist +& $python -m pip install "${wheelPath}[test-cu$cudaMajor]" + +Push-Location (Join-Path (Get-RepoRoot) "python/cuda_cccl/tests") +try { + & $python -m pytest -n auto -v coop/_experimental/ +} +finally { Pop-Location } diff --git a/docs/python/compute/developer_overview.rst b/docs/python/compute/developer_overview.rst index c6d30971b01..6cec0b38e5f 100644 --- a/docs/python/compute/developer_overview.rst +++ b/docs/python/compute/developer_overview.rst @@ -451,6 +451,176 @@ as an example: At this point, the kernels stored in the reduction object are launched and the reduction is performed. +Caching and free-threaded Python +-------------------------------- + +The user-facing cache behavior is described in :ref:`cuda.compute.caching`. This +section describes the implementation contracts that keep that behavior correct +for free-threaded Python and multi-GPU use. + +Design requirements ++++++++++++++++++++ + +The free-threading design is constrained by the following requirements: + +* Importing ``cuda.compute`` in a free-threaded CPython interpreter must not + re-enable the GIL. +* Free-threading support should not add global locking or shared-state + contention to the normal single-threaded execution path. Wrapper cache hits + should be thread-local, and normal algorithm execution should not take a + global cache lock. +* Mutable wrapper state must not be shared across threads. +* Expensive native build results should still be shared across threads when they + are safe to share. +* Same-key concurrent cold builds should build once; waiters should receive the + same result or observe the same exception. + +Build and validation requirements ++++++++++++++++++++++++++++++++++ + +The Cython extension that backs ``cuda.compute`` must opt in to free-threaded +execution: + +.. code-block:: cython + + # cython: freethreading_compatible=True + +Without this marker, importing the extension in a free-threaded CPython process +can cause CPython to re-enable the GIL. The generated extension should advertise +``Py_MOD_GIL_NOT_USED`` and importing ``cuda.compute`` should leave +``sys._is_gil_enabled()`` false. + +The free-threaded wheel must also keep its free-threaded ABI tag after repair and +merge steps. For CPython 3.14, the expected wheel tag contains +``cp314-cp314t`` rather than the regular ``cp314-cp314`` tag. The acceptance +criteria for a free-threaded build are: + +* the wheel has the expected ``cp314-cp314t`` ABI tag; +* importing ``cuda.compute`` does not re-enable the GIL; +* the free-threading stress suite passes without forcing ``PYTHON_GIL=0`` or + ``-X gil=0``. + +Two cache layers +++++++++++++++++ + +Internally, ``cuda.compute`` separates two kinds of cached state: + +* **Wrapper objects** are the Python objects returned by ``make_*`` APIs, such as + ``make_reduce_into``. They own per-call descriptor state and are cached per + Python thread by ``cache_with_registered_key_functions`` in + ``cuda/compute/_caching.py``. Keeping wrapper caches thread-local avoids + sharing mutable wrapper state across concurrent calls from free-threaded + Python. +* **Build results** are the Cython objects that own the native C parallel build + state, such as loaded CUDA libraries, kernels, policy state, and other + read-only data needed to invoke an algorithm. They are cached by + ``cache_build_result`` and may be shared by wrapper objects in different + Python threads. + +The normal cache-hit path is intentionally cheap. A wrapper-cache hit is +thread-local and does not take the shared build-cache lock. The shared +build-cache lock is used when constructing a wrapper that needs to look up, +coordinate, or create a native build result, not during ordinary execution of an +already-returned wrapper object. + +Device keying ++++++++++++++ + +Both cache layers include the current CUDA runtime device ordinal and compute +capability in their keys. The compute capability identifies the architecture used +for code generation and policy selection. The device ordinal keeps native build +state associated with the device on which it was built. + +The first implementation intentionally keys shared build results by CUDA runtime +device ordinal rather than by CUDA context handle. User-managed CUDA driver +contexts are not a target use case for ``cuda.compute``. CUDA runtime, +``cuda.core``, CuPy, and PyTorch-style applications are expected to use the +primary-context model, and language frontends generally prefer that model. + +The first implementation also does not share build results across devices that +happen to have the same compute capability. Native build results are not treated +as pure SM-level code artifacts. They can contain CUDA-facing build/load state, +and CUB launch paths may resolve a ``CUkernel`` to the current-context +``CUfunction`` before occupancy queries or launch. Some paths also get or set +kernel attributes on the resolved function, and CUDA kernel-attribute behavior +is device-specific. Until every build-result path is audited for same-SM +cross-device sharing, separate device ordinals build and cache separate native +results. + +Concurrent build coordination ++++++++++++++++++++++++++++++ + +``cache_build_result`` is responsible for coordinating concurrent cache misses. +The first thread to miss a build-result key runs the builder, while other +threads wait for that in-flight build to complete. If the build succeeds, all +waiting threads receive the same cached build result. If it fails, the exception +is propagated to the waiting threads and the failed build is not stored in the +cache. + +When adding a new algorithm, the factory that returns the reusable wrapper object +should use ``cache_with_registered_key_functions``. The wrapper constructor +should pass the expensive native build operation to ``cache_build_result`` if +that native state is safe to share across threads. Do not perform an expensive +native build before entering ``cache_build_result``; otherwise same-key cold +factory calls can duplicate the build and bypass single-flight coordination. + +The specialization key must include every argument that can affect generated +code, type layout, policy selection, or native build state. It should not include +runtime-only values such as array pointers, array contents, item counts, streams, +or temporary-storage pointers unless those values change the compiled interface. + +User-object and descriptor contracts +++++++++++++++++++++++++++++++++++++ + +Wrapper objects returned by ``make_*`` APIs are not thread-reentrant. If two +threads need the same algorithm specialization, each thread should call the +factory and receive its own wrapper object, or the caller must externally +serialize access to a shared wrapper. The wrapper updates its Cython +``Iterator``, ``Op``, ``Value``, and algorithm-specific descriptors before each +native call, so concurrent calls through the same wrapper could overwrite the +descriptor state another thread is about to use. + +Read-only iterator and operator objects may be shared across threads. The +iterator base class uses a per-iterator lock for first-time lazy construction of +advance, input-dereference, and output-dereference ``Op`` objects; cached access +after that remains lock-free. This lock does not make arbitrary mutation safe: +concurrent mutation of iterator state, operator state, captured state, or child +iterators remains unsupported unless the caller synchronizes externally. + +Mutable execution state belongs to one thread at a time unless the caller +provides synchronization. This includes output arrays, temporary-storage buffers, +streams, ``DoubleBuffer`` instances, and other objects whose state changes as +part of a launch. + +Backend-specific notes +++++++++++++++++++++++ + +The v1 NVRTC/nvJitLink backend and the v2 HostJIT backend have different +free-threading risk surfaces and must be audited independently. v1 stresses +NVRTC, nvJitLink, CUDA library loading, and CUB host dispatch. v2 adds HostJIT +compiler state, LLVM/Clang initialization, persistent PCH paths, generated +source/cubin artifacts, and dynamic loader lifetime. + +Transform has one additional v1 native-cache rule. In CPython 3.14 +free-threaded builds, ``python/cuda_cccl/CMakeLists.txt`` defines +``CCCL_PYTHON_FREE_THREADED`` for the bundled C parallel target, and +``c/parallel/src/transform.cu`` uses that macro to bypass the native +``async_config`` / ``prefetch_config`` cache. Normal non-free-threaded builds +keep the existing lazy native cache path. This avoids adding launch-path locking +for transform in free-threaded Python builds while preserving the existing +single-threaded behavior elsewhere. + +Clearing caches ++++++++++++++++ + +``clear_all_caches()`` is process-local. It clears all known per-thread wrapper +caches through a weak registry of live thread cache containers, and it clears the +shared build-result cache. Separate Python processes build and cache +independently. + +Calling ``clear_all_caches()`` concurrently with active factory calls or +algorithm execution is not supported unless the caller synchronizes externally. + For readers who want to connect this overview back to the source tree: diff --git a/docs/python/compute/index.rst b/docs/python/compute/index.rst index ba8519a8352..e5f82bb8386 100644 --- a/docs/python/compute/index.rst +++ b/docs/python/compute/index.rst @@ -250,10 +250,14 @@ When working with structured data, there are two common memory layouts: Caching ------- -Algorithms in ``cuda.compute`` are compiled to GPU code at runtime. To avoid -recompiling on every call, build results are cached in memory. When you invoke -an algorithm with the same configuration—same dtypes, iterator kinds, operator, -and compute capability—the cached build is reused. +Algorithms in ``cuda.compute`` are compiled to GPU code at runtime. To +avoid recompiling on every call, build results are cached in memory. +When you invoke an algorithm with the same configuration—same dtypes, +iterator kinds, operator, compute capability, and current device—the +cached build is reused. On systems with multiple GPUs, builds may be +cached separately for each GPU. When free-threaded Python is enabled, +compiled build results may be reused by multiple threads in the same +process. What determines the cache key +++++++++++++++++++++++++++++ @@ -265,12 +269,44 @@ Each algorithm computes a cache key from: * **Operator identity** — for user-defined functions, the function's bytecode, constants, and closure contents (see below) * **Compute capability** — the GPU architecture of the current device +* **Current device** — the CUDA device active when the algorithm is built * **Algorithm-specific parameters** — such as initial value dtype or determinism mode Note that array *contents* or *pointers* are not part of the cache key—only the array's dtype. This means you can reuse a cached algorithm across different arrays of the same type. +Multi-GPU behavior +++++++++++++++++++ + +Cached builds are device-specific. If the same algorithm configuration is used +on multiple GPUs, ``cuda.compute`` may compile and cache a separate build for +each device. Set the intended current CUDA device before constructing or invoking +an algorithm, and pass arrays that are valid on that device. + +Free-threaded Python +++++++++++++++++++++ + +When ``cuda.compute`` is built for a free-threaded Python interpreter, +independent calls from multiple Python threads can reuse compiled build results +within the same process. + +The cache is local to the current Python process. Separate Python processes build +and cache independently, even if they use the same GPU and algorithm +configuration. + +This does not make user-provided memory or CUDA work automatically safe to share. +Users are still responsible for avoiding data races, such as two threads writing +to the same output array at the same time. Read-only iterator and operator +objects may be shared across threads, but concurrent mutation of those objects, +captured state, or underlying arrays requires external synchronization. For +concurrent use, prefer the direct +algorithm APIs, such as +:func:`reduce_into `, or create a separate +reusable algorithm object in each thread (for example, the object returned by +:func:`make_reduce_into `). If multiple +threads share one of these objects, serialize access to that object. + How user-defined functions are cached +++++++++++++++++++++++++++++++++++++ diff --git a/python/cuda_cccl/CMakeLists.txt b/python/cuda_cccl/CMakeLists.txt index 09044f19442..905087a7f42 100644 --- a/python/cuda_cccl/CMakeLists.txt +++ b/python/cuda_cccl/CMakeLists.txt @@ -83,6 +83,25 @@ install( # Build and install Cython extension find_package(Python3 COMPONENTS Interpreter Development.Module REQUIRED) +set( + _python_gil_disabled_query + "import sysconfig; print('1' if sysconfig.get_config_var('Py_GIL_DISABLED') in (1, '1') else '0')" +) +execute_process( + COMMAND "${Python3_EXECUTABLE}" -c "${_python_gil_disabled_query}" + OUTPUT_VARIABLE _python_gil_disabled + RESULT_VARIABLE _python_gil_disabled_result + OUTPUT_STRIP_TRAILING_WHITESPACE +) +if (NOT _python_gil_disabled_result EQUAL 0) + message(FATAL_ERROR "Failed to query Py_GIL_DISABLED from ${Python3_EXECUTABLE}") +endif() + +if (Python3_VERSION_MAJOR EQUAL 3 AND Python3_VERSION_MINOR EQUAL 14 AND "${_python_gil_disabled}" STREQUAL "1") + target_compile_definitions(${_cccl_c_parallel_target} PRIVATE CCCL_PYTHON_FREE_THREADED=1) + message(STATUS "Enabling CCCL_PYTHON_FREE_THREADED for Python 3.14t") +endif() + get_filename_component(_python_path "${Python3_EXECUTABLE}" PATH) set(CYTHON_version_command "${Python3_EXECUTABLE}" -m cython --version) diff --git a/python/cuda_cccl/cuda/compute/_bindings_impl.pyx b/python/cuda_cccl/cuda/compute/_bindings_impl.pyx index 45f48eab053..b97a500aed3 100644 --- a/python/cuda_cccl/cuda/compute/_bindings_impl.pyx +++ b/python/cuda_cccl/cuda/compute/_bindings_impl.pyx @@ -6,6 +6,7 @@ # distutils: language = c++ # cython: language_level=3 # cython: linetrace=True +# cython: freethreading_compatible=True # Python signatures are declared in the companion Python stub file _bindings.pyi # Make sure to update PYI with change to Python API to ensure that Python diff --git a/python/cuda_cccl/cuda/compute/_caching.py b/python/cuda_cccl/cuda/compute/_caching.py index 034292e76e9..8b8ef103957 100644 --- a/python/cuda_cccl/cuda/compute/_caching.py +++ b/python/cuda_cccl/cuda/compute/_caching.py @@ -6,15 +6,14 @@ from __future__ import annotations import functools +import threading import types +import weakref from typing import Any, Callable, Hashable import numpy as np -try: - from cuda.core import Device -except ImportError: - from cuda.core.experimental import Device +from cuda.core import Device from ._utils.protocols import get_dtype, get_shape, is_device_array from .struct import _Struct @@ -93,6 +92,150 @@ def _make_cache_key_from_args(*args, **kwargs) -> tuple: _cache_registry: dict[str, object] = {} +class _ThreadLocalCaches: + """ + Container for wrapper caches owned by a single Python thread. + + Each thread gets its own instance via ``threading.local()``. We use + ``__weakref__`` to enable the process-wide registry of caches to hold weak + references to the thread's caches. That way, if a thread exits, its caches + will be garbage collected and removed from the registry even if the + process-wide registry still references them. + """ + + __slots__ = ("wrapper_caches", "__weakref__") + + def __init__(self) -> None: + # Outer key: decorated algorithm factory name. Inner key: current thread + # id, current CUDA runtime device ordinal, compute capability, and + # specialization key derived from factory arguments. + self.wrapper_caches: dict[str, dict[Hashable, Any]] = {} + + +class _InFlightBuild: + """ + Coordination state for one shared build-result currently being built. + + The first thread for a cache key runs the builder. Other threads wait on + ``condition`` and receive either the completed build result or the builder's + exception. + """ + + def __init__(self) -> None: + self.condition = threading.Condition() + self.done = False + self.result: Any = None + self.exception: BaseException | None = None + + +_thread_local = threading.local() +# Process wide registry of per-thread caches. It enables a thread to call +# clear_all_caches() to clear all caches across all threads. +_thread_cache_registry: weakref.WeakSet[_ThreadLocalCaches] = weakref.WeakSet() +_thread_cache_registry_lock = threading.Lock() + +_shared_build_cache: dict[Hashable, Any] = {} +_in_flight_builds: dict[Hashable, _InFlightBuild] = {} +_shared_build_cache_lock = threading.Lock() + + +def _get_current_device_info() -> tuple[int, tuple[int, int]]: + device = Device() + cc = device.compute_capability + return device.device_id, (cc.major, cc.minor) + + +def _get_thread_caches() -> _ThreadLocalCaches: + caches = getattr(_thread_local, "caches", None) + if caches is None: + caches = _ThreadLocalCaches() + _thread_local.caches = caches + with _thread_cache_registry_lock: + _thread_cache_registry.add(caches) + return caches + + +def _clear_wrapper_caches(cache_name: str | None = None) -> None: + with _thread_cache_registry_lock: + thread_caches = list(_thread_cache_registry) + + for caches in thread_caches: + if cache_name is None: + caches.wrapper_caches.clear() + else: + caches.wrapper_caches.pop(cache_name, None) + + +def cache_build_result( + build_result_type: type, + *key_args, + builder: Callable[[], Any], +) -> Any: + """ + Cache a shared Cython build-result object for the current CUDA device. + + The key intentionally excludes the current Python thread. Wrappers are + cached per thread, but build results are shared across threads for the same + device ordinal and specialization key. + + Args: + build_result_type: Cython build-result type. This separates different + build-result caches that may otherwise have identical specialization + keys. + *key_args: Positional values used to form the specialization part of + the cache key. + builder: Callable that creates the build result on a cache miss. + Exactly one thread runs this callable for a given key while other + threads wait for the result. + + Returns: + The cached or newly built Cython build-result object. + """ + device_id, cc_key = _get_current_device_info() + user_cache_key = _make_cache_key_from_args(*key_args) + cache_key = (build_result_type, device_id, cc_key, user_cache_key) + + with _shared_build_cache_lock: + if cache_key in _shared_build_cache: + return _shared_build_cache[cache_key] + + in_flight = _in_flight_builds.get(cache_key) + if in_flight is None: + in_flight = _InFlightBuild() + _in_flight_builds[cache_key] = in_flight + is_builder = True + else: + is_builder = False + + if is_builder: + try: + result = builder() + except BaseException as exc: + with _shared_build_cache_lock: + _in_flight_builds.pop(cache_key, None) + with in_flight.condition: + in_flight.exception = exc + in_flight.done = True + in_flight.condition.notify_all() + raise + + with _shared_build_cache_lock: + _shared_build_cache[cache_key] = result + _in_flight_builds.pop(cache_key, None) + with in_flight.condition: + in_flight.result = result + in_flight.done = True + in_flight.condition.notify_all() + return result + + with in_flight.condition: + while not in_flight.done: + in_flight.condition.wait() + if in_flight.exception is not None: + raise in_flight.exception + return in_flight.result + + class _CacheWithRegisteredKeyFunctions: """ Decorator to cache the result of the decorated function. @@ -113,19 +256,21 @@ def __call__(self, func: Callable) -> Callable: The CUDA compute capability of the current device is appended to the cache key. """ - cache: dict = {} + cache_name = func.__qualname__ @functools.wraps(func) def inner(*args, **kwargs): - cc = Device().compute_capability + device_id, cc_key = _get_current_device_info() user_cache_key = _make_cache_key_from_args(*args, **kwargs) - cache_key = (user_cache_key, tuple(cc)) + cache_key = (threading.get_ident(), device_id, cc_key, user_cache_key) + thread_caches = _get_thread_caches() + cache = thread_caches.wrapper_caches.setdefault(cache_name, {}) if cache_key not in cache: result = func(*args, **kwargs) cache[cache_key] = result return cache[cache_key] - inner.cache_clear = cache.clear # type: ignore[attr-defined] + inner.cache_clear = lambda: _clear_wrapper_caches(cache_name) # type: ignore[attr-defined] # Register the cache in the central registry _cache_registry[func.__qualname__] = inner @@ -184,8 +329,10 @@ def clear_all_caches(): >>> import cuda.compute >>> cuda.compute.clear_all_caches() """ - for cached_func in _cache_registry.values(): - cached_func.cache_clear() + _clear_wrapper_caches() + with _shared_build_cache_lock: + _shared_build_cache.clear() + _in_flight_builds.clear() class CachableFunction: diff --git a/python/cuda_cccl/cuda/compute/algorithms/_binary_search.py b/python/cuda_cccl/cuda/compute/algorithms/_binary_search.py index 23a99c7bc40..657ec6fbfa4 100644 --- a/python/cuda_cccl/cuda/compute/algorithms/_binary_search.py +++ b/python/cuda_cccl/cuda/compute/algorithms/_binary_search.py @@ -9,7 +9,7 @@ from .. import _bindings, types from .. import _cccl_interop as cccl -from .._caching import cache_with_registered_key_functions +from .._caching import cache_build_result, cache_with_registered_key_functions from .._cccl_interop import call_build, set_cccl_iterator_state from .._utils import protocols from ..op import OpAdapter, OpKind, make_op_adapter @@ -58,13 +58,21 @@ def __init__( self.op_cccl = comp.compile((data_value_type, data_value_type), types.uint8) - self.build_result = call_build( + self.build_result = cache_build_result( _bindings.DeviceBinarySearchBuildResult, + d_data, + d_values, + d_out, + comp, mode, - self.d_data_cccl, - self.d_values_cccl, - self.d_out_cccl, - self.op_cccl, + builder=lambda: call_build( + _bindings.DeviceBinarySearchBuildResult, + mode, + self.d_data_cccl, + self.d_values_cccl, + self.d_out_cccl, + self.op_cccl, + ), ) def __call__( diff --git a/python/cuda_cccl/cuda/compute/algorithms/_histogram.py b/python/cuda_cccl/cuda/compute/algorithms/_histogram.py index f865a767dab..6b406989ef2 100644 --- a/python/cuda_cccl/cuda/compute/algorithms/_histogram.py +++ b/python/cuda_cccl/cuda/compute/algorithms/_histogram.py @@ -11,7 +11,7 @@ from .. import _bindings from .. import _cccl_interop as cccl -from .._caching import cache_with_registered_key_functions +from .._caching import cache_build_result, cache_with_registered_key_functions from .._cccl_interop import call_build, set_cccl_iterator_state, to_cccl_value_state from .._utils.protocols import get_data_pointer, validate_and_get_stream from .._utils.temp_storage_buffer import TempStorageBuffer @@ -51,17 +51,28 @@ def __init__( self.h_lower_level_cccl = cccl.to_cccl_value(h_lower_level) self.h_upper_level_cccl = cccl.to_cccl_value(h_upper_level) - self.build_result = call_build( + self.build_result = cache_build_result( _bindings.DeviceHistogramBuildResult, - num_channels, - num_active_channels, - self.d_samples_cccl, - num_levels, - self.d_histogram_cccl, - self.h_lower_level_cccl, - self.num_rows, - row_stride_samples, + d_samples, + d_histogram, + int(num_levels), + h_lower_level[0].item(), + h_upper_level[0].item(), + h_lower_level.dtype, + num_samples, is_evenly_segmented, + builder=lambda: call_build( + _bindings.DeviceHistogramBuildResult, + num_channels, + num_active_channels, + self.d_samples_cccl, + num_levels, + self.d_histogram_cccl, + self.h_lower_level_cccl, + self.num_rows, + row_stride_samples, + is_evenly_segmented, + ), ) def __call__( diff --git a/python/cuda_cccl/cuda/compute/algorithms/_reduce.py b/python/cuda_cccl/cuda/compute/algorithms/_reduce.py index d9c20cad2dc..d41b5223cb6 100644 --- a/python/cuda_cccl/cuda/compute/algorithms/_reduce.py +++ b/python/cuda_cccl/cuda/compute/algorithms/_reduce.py @@ -11,7 +11,7 @@ from .. import _bindings from .. import _cccl_interop as cccl -from .._caching import cache_with_registered_key_functions +from .._caching import cache_build_result, cache_with_registered_key_functions from .._cccl_interop import ( call_build, get_value_type, @@ -59,13 +59,21 @@ def __init__( value_type = get_value_type(h_init) self.op_cccl = op.compile((value_type, value_type), value_type) - self.build_result = call_build( + self.build_result = cache_build_result( _bindings.DeviceReduceBuildResult, - self.d_in_cccl, - self.d_out_cccl, - self.op_cccl, - self.h_init_cccl, + d_in, + d_out, + op, + h_init, determinism, + builder=lambda: call_build( + _bindings.DeviceReduceBuildResult, + self.d_in_cccl, + self.d_out_cccl, + self.op_cccl, + self.h_init_cccl, + determinism, + ), ) match determinism: diff --git a/python/cuda_cccl/cuda/compute/algorithms/_scan.py b/python/cuda_cccl/cuda/compute/algorithms/_scan.py index bc7ecd4c587..21d78b342ad 100644 --- a/python/cuda_cccl/cuda/compute/algorithms/_scan.py +++ b/python/cuda_cccl/cuda/compute/algorithms/_scan.py @@ -11,7 +11,7 @@ from .. import _bindings from .. import _cccl_interop as cccl -from .._caching import cache_with_registered_key_functions +from .._caching import cache_build_result, cache_with_registered_key_functions from .._cccl_interop import ( call_build, get_value_type, @@ -89,14 +89,23 @@ def __init__( # Compile the op with value types self.op_cccl = op.compile((value_type, value_type), value_type) - self.build_result = call_build( + self.build_result = cache_build_result( _bindings.DeviceScanBuildResult, - self.d_in_cccl, - self.d_out_cccl, - self.op_cccl, - init_value_type_info, + d_in, + d_out, + op, + init_value, force_inclusive, self.init_kind, + builder=lambda: call_build( + _bindings.DeviceScanBuildResult, + self.d_in_cccl, + self.d_out_cccl, + self.op_cccl, + init_value_type_info, + force_inclusive, + self.init_kind, + ), ) match (force_inclusive, self.init_kind): diff --git a/python/cuda_cccl/cuda/compute/algorithms/_segmented_reduce.py b/python/cuda_cccl/cuda/compute/algorithms/_segmented_reduce.py index 74b593f9944..5edfa5e0312 100644 --- a/python/cuda_cccl/cuda/compute/algorithms/_segmented_reduce.py +++ b/python/cuda_cccl/cuda/compute/algorithms/_segmented_reduce.py @@ -11,7 +11,7 @@ from .. import _bindings from .. import _cccl_interop as cccl -from .._caching import cache_with_registered_key_functions +from .._caching import cache_build_result, cache_with_registered_key_functions from .._cccl_interop import ( call_build, get_value_type, @@ -58,14 +58,23 @@ def __init__( self.op_cccl = op.compile((value_type, value_type), value_type) - self.build_result = call_build( + self.build_result = cache_build_result( _bindings.DeviceSegmentedReduceBuildResult, - self.d_in_cccl, - self.d_out_cccl, - self.start_offsets_in_cccl, - self.end_offsets_in_cccl, - self.op_cccl, - self.h_init_cccl, + d_in, + d_out, + start_offsets_in, + end_offsets_in, + op, + h_init, + builder=lambda: call_build( + _bindings.DeviceSegmentedReduceBuildResult, + self.d_in_cccl, + self.d_out_cccl, + self.start_offsets_in_cccl, + self.end_offsets_in_cccl, + self.op_cccl, + self.h_init_cccl, + ), ) def __call__( diff --git a/python/cuda_cccl/cuda/compute/algorithms/_sort/_merge_sort.py b/python/cuda_cccl/cuda/compute/algorithms/_sort/_merge_sort.py index 1070042a4c4..ace448bd511 100644 --- a/python/cuda_cccl/cuda/compute/algorithms/_sort/_merge_sort.py +++ b/python/cuda_cccl/cuda/compute/algorithms/_sort/_merge_sort.py @@ -8,7 +8,7 @@ from ... import _bindings, types from ... import _cccl_interop as cccl -from ..._caching import cache_with_registered_key_functions +from ..._caching import cache_build_result, cache_with_registered_key_functions from ..._cccl_interop import call_build, set_cccl_iterator_state from ..._utils.protocols import ( get_data_pointer, @@ -52,13 +52,21 @@ def __init__( value_type = cccl.get_value_type(d_in_keys) self.op_cccl = op.compile((value_type, value_type), types.int8) - self.build_result = call_build( + self.build_result = cache_build_result( _bindings.DeviceMergeSortBuildResult, - self.d_in_keys_cccl, - self.d_in_values_cccl, - self.d_out_keys_cccl, - self.d_out_values_cccl, - self.op_cccl, + d_in_keys, + d_in_values, + d_out_keys, + d_out_values, + op, + builder=lambda: call_build( + _bindings.DeviceMergeSortBuildResult, + self.d_in_keys_cccl, + self.d_in_values_cccl, + self.d_out_keys_cccl, + self.d_out_values_cccl, + self.op_cccl, + ), ) def __call__( diff --git a/python/cuda_cccl/cuda/compute/algorithms/_sort/_radix_sort.py b/python/cuda_cccl/cuda/compute/algorithms/_sort/_radix_sort.py index d09dcd0a79a..cf2af440135 100644 --- a/python/cuda_cccl/cuda/compute/algorithms/_sort/_radix_sort.py +++ b/python/cuda_cccl/cuda/compute/algorithms/_sort/_radix_sort.py @@ -7,7 +7,7 @@ from ... import _bindings from ... import _cccl_interop as cccl -from ..._caching import cache_with_registered_key_functions +from ..._caching import cache_build_result, cache_with_registered_key_functions from ..._cccl_interop import call_build, set_cccl_iterator_state from ..._utils.protocols import ( get_data_pointer, @@ -56,15 +56,26 @@ def __init__( ) decomposer_return_type = "".encode("utf-8") - self.build_result = call_build( - _bindings.DeviceRadixSortBuildResult, + build_order = ( _bindings.SortOrder.ASCENDING if order is SortOrder.ASCENDING - else _bindings.SortOrder.DESCENDING, - self.d_in_keys_cccl, - self.d_in_values_cccl, - self.decomposer_op, - decomposer_return_type, + else _bindings.SortOrder.DESCENDING + ) + self.build_result = cache_build_result( + _bindings.DeviceRadixSortBuildResult, + d_in_keys, + d_out_keys, + d_in_values, + d_out_values, + order, + builder=lambda: call_build( + _bindings.DeviceRadixSortBuildResult, + build_order, + self.d_in_keys_cccl, + self.d_in_values_cccl, + self.decomposer_op, + decomposer_return_type, + ), ) def __call__( diff --git a/python/cuda_cccl/cuda/compute/algorithms/_sort/_segmented_sort.py b/python/cuda_cccl/cuda/compute/algorithms/_sort/_segmented_sort.py index d74cd256a81..33ec5279c36 100644 --- a/python/cuda_cccl/cuda/compute/algorithms/_sort/_segmented_sort.py +++ b/python/cuda_cccl/cuda/compute/algorithms/_sort/_segmented_sort.py @@ -9,7 +9,7 @@ from ... import _bindings from ... import _cccl_interop as cccl -from ..._caching import cache_with_registered_key_functions +from ..._caching import cache_build_result, cache_with_registered_key_functions from ..._cccl_interop import call_build, set_cccl_iterator_state from ..._utils.protocols import ( get_data_pointer, @@ -52,15 +52,28 @@ def __init__( self.start_offsets_in_cccl = cccl.to_cccl_input_iter(start_offsets_in) self.end_offsets_in_cccl = cccl.to_cccl_input_iter(end_offsets_in) - self.build_result = call_build( - _bindings.DeviceSegmentedSortBuildResult, + build_order = ( _bindings.SortOrder.ASCENDING if order is SortOrder.ASCENDING - else _bindings.SortOrder.DESCENDING, - self.d_in_keys_cccl, - self.d_in_values_cccl, - self.start_offsets_in_cccl, - self.end_offsets_in_cccl, + else _bindings.SortOrder.DESCENDING + ) + self.build_result = cache_build_result( + _bindings.DeviceSegmentedSortBuildResult, + d_in_keys, + d_out_keys, + d_in_values, + d_out_values, + start_offsets_in, + end_offsets_in, + order, + builder=lambda: call_build( + _bindings.DeviceSegmentedSortBuildResult, + build_order, + self.d_in_keys_cccl, + self.d_in_values_cccl, + self.start_offsets_in_cccl, + self.end_offsets_in_cccl, + ), ) def __call__( diff --git a/python/cuda_cccl/cuda/compute/algorithms/_three_way_partition.py b/python/cuda_cccl/cuda/compute/algorithms/_three_way_partition.py index fbd3154feb8..d5b076028c5 100644 --- a/python/cuda_cccl/cuda/compute/algorithms/_three_way_partition.py +++ b/python/cuda_cccl/cuda/compute/algorithms/_three_way_partition.py @@ -9,7 +9,7 @@ from .. import _bindings, types from .. import _cccl_interop as cccl -from .._caching import cache_with_registered_key_functions +from .._caching import cache_build_result, cache_with_registered_key_functions from .._cccl_interop import call_build, set_cccl_iterator_state from .._utils import protocols from .._utils.temp_storage_buffer import TempStorageBuffer @@ -54,15 +54,25 @@ def __init__( (value_type,), types.uint8 ) - self.build_result = call_build( + self.build_result = cache_build_result( _bindings.DeviceThreeWayPartitionBuildResult, - self.d_in_cccl, - self.d_first_part_out_cccl, - self.d_second_part_out_cccl, - self.d_unselected_out_cccl, - self.d_num_selected_out_cccl, - self.select_first_part_op_cccl, - self.select_second_part_op_cccl, + d_in, + d_first_part_out, + d_second_part_out, + d_unselected_out, + d_num_selected_out, + select_first_part_op, + select_second_part_op, + builder=lambda: call_build( + _bindings.DeviceThreeWayPartitionBuildResult, + self.d_in_cccl, + self.d_first_part_out_cccl, + self.d_second_part_out_cccl, + self.d_unselected_out_cccl, + self.d_num_selected_out_cccl, + self.select_first_part_op_cccl, + self.select_second_part_op_cccl, + ), ) def __call__( diff --git a/python/cuda_cccl/cuda/compute/algorithms/_transform.py b/python/cuda_cccl/cuda/compute/algorithms/_transform.py index f987efd915c..4e32f9dfbdc 100644 --- a/python/cuda_cccl/cuda/compute/algorithms/_transform.py +++ b/python/cuda_cccl/cuda/compute/algorithms/_transform.py @@ -9,7 +9,7 @@ from .. import _bindings from .. import _cccl_interop as cccl -from .._caching import cache_with_registered_key_functions +from .._caching import cache_build_result, cache_with_registered_key_functions from .._cccl_interop import set_cccl_iterator_state from .._utils import protocols from ..op import OpAdapter, make_op_adapter @@ -33,11 +33,17 @@ def __init__( out_type = cccl.get_value_type(d_out) self.op_cccl = op.compile((in_type,), out_type) - self.build_result = cccl.call_build( + self.build_result = cache_build_result( _bindings.DeviceUnaryTransform, - self.d_in_cccl, - self.d_out_cccl, - self.op_cccl, + d_in, + d_out, + op, + builder=lambda: cccl.call_build( + _bindings.DeviceUnaryTransform, + self.d_in_cccl, + self.d_out_cccl, + self.op_cccl, + ), ) def __call__( @@ -92,12 +98,19 @@ def __init__( out_type = cccl.get_value_type(d_out) self.op_cccl = op.compile((in1_type, in2_type), out_type) - self.build_result = cccl.call_build( + self.build_result = cache_build_result( _bindings.DeviceBinaryTransform, - self.d_in1_cccl, - self.d_in2_cccl, - self.d_out_cccl, - self.op_cccl, + d_in1, + d_in2, + d_out, + op, + builder=lambda: cccl.call_build( + _bindings.DeviceBinaryTransform, + self.d_in1_cccl, + self.d_in2_cccl, + self.d_out_cccl, + self.op_cccl, + ), ) def __call__( diff --git a/python/cuda_cccl/cuda/compute/algorithms/_unique_by_key.py b/python/cuda_cccl/cuda/compute/algorithms/_unique_by_key.py index 0e39f182507..7b42038dfbf 100644 --- a/python/cuda_cccl/cuda/compute/algorithms/_unique_by_key.py +++ b/python/cuda_cccl/cuda/compute/algorithms/_unique_by_key.py @@ -8,7 +8,7 @@ from .. import _bindings, types from .. import _cccl_interop as cccl -from .._caching import cache_with_registered_key_functions +from .._caching import cache_build_result, cache_with_registered_key_functions from .._cccl_interop import call_build, set_cccl_iterator_state from .._utils.protocols import ( get_data_pointer, @@ -49,14 +49,23 @@ def __init__( value_type = cccl.get_value_type(d_in_keys) self.op_cccl = op.compile((value_type, value_type), types.uint8) - self.build_result = call_build( + self.build_result = cache_build_result( _bindings.DeviceUniqueByKeyBuildResult, - self.d_in_keys_cccl, - self.d_in_items_cccl, - self.d_out_keys_cccl, - self.d_out_items_cccl, - self.d_out_num_selected_cccl, - self.op_cccl, + d_in_keys, + d_in_items, + d_out_keys, + d_out_items, + d_out_num_selected, + op, + builder=lambda: call_build( + _bindings.DeviceUniqueByKeyBuildResult, + self.d_in_keys_cccl, + self.d_in_items_cccl, + self.d_out_keys_cccl, + self.d_out_items_cccl, + self.d_out_num_selected_cccl, + self.op_cccl, + ), ) def __call__( diff --git a/python/cuda_cccl/cuda/compute/iterators/_base.py b/python/cuda_cccl/cuda/compute/iterators/_base.py index cc1b1b83fc8..746f2dc5931 100644 --- a/python/cuda_cccl/cuda/compute/iterators/_base.py +++ b/python/cuda_cccl/cuda/compute/iterators/_base.py @@ -9,6 +9,7 @@ from __future__ import annotations import hashlib +import threading from typing import Hashable from .._bindings import Iterator, IteratorKind, IteratorState, Op @@ -54,6 +55,7 @@ class IteratorBase: "_input_deref_op", "_output_deref_op", "_uid_cached", + "_op_lock", ] def __init__( @@ -75,6 +77,11 @@ def __init__( self._input_deref_op: Op | None = None self._output_deref_op: Op | None = None self._uid_cached: str | None = None + # Free-threaded Python can let multiple threads share a read-only + # iterator object and race during the first lazy Op construction. + # The lock only protects that cache miss path; cached access stays + # lock-free and iterator mutation remains the caller's responsibility. + self._op_lock = threading.Lock() @property def state(self) -> IteratorState: @@ -117,19 +124,25 @@ def _make_output_deref_symbol(self) -> str: def get_advance_op(self) -> Op: """Get the cached Op for the advance operation.""" if self._advance_op is None: - self._advance_op = self._make_advance_op() + with self._op_lock: + if self._advance_op is None: + self._advance_op = self._make_advance_op() return self._advance_op def get_input_deref_op(self) -> Op | None: """Get the cached Op for input dereference operation, or None if not supported.""" if self._input_deref_op is None: - self._input_deref_op = self._make_input_deref_op() + with self._op_lock: + if self._input_deref_op is None: + self._input_deref_op = self._make_input_deref_op() return self._input_deref_op def get_output_deref_op(self) -> Op | None: """Get the cached Op for output dereference operation, or None if not supported.""" if self._output_deref_op is None: - self._output_deref_op = self._make_output_deref_op() + with self._op_lock: + if self._output_deref_op is None: + self._output_deref_op = self._make_output_deref_op() return self._output_deref_op @property diff --git a/python/cuda_cccl/pyproject.toml b/python/cuda_cccl/pyproject.toml index 98e0e6533c0..ce18d198f5f 100644 --- a/python/cuda_cccl/pyproject.toml +++ b/python/cuda_cccl/pyproject.toml @@ -177,4 +177,5 @@ markers = [ "no_verify_sass: skip SASS verification check", "large: tests requiring large device memory allocations", "no_numba: tests that should not import numba or numba.cuda", + "free_threading: tests requiring free-threaded CPython with the GIL disabled", ] diff --git a/python/cuda_cccl/tests/compute/examples/free_threading/__init__.py b/python/cuda_cccl/tests/compute/examples/free_threading/__init__.py new file mode 100644 index 00000000000..8bbe3ce1ab8 --- /dev/null +++ b/python/cuda_cccl/tests/compute/examples/free_threading/__init__.py @@ -0,0 +1,3 @@ +# Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception diff --git a/python/cuda_cccl/tests/compute/examples/free_threading/direct_api.py b/python/cuda_cccl/tests/compute/examples/free_threading/direct_api.py new file mode 100644 index 00000000000..02cbe05a6f6 --- /dev/null +++ b/python/cuda_cccl/tests/compute/examples/free_threading/direct_api.py @@ -0,0 +1,46 @@ +# Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +# example-begin +""" +Run independent direct API calls from multiple Python threads. +""" + +from concurrent.futures import ThreadPoolExecutor + +import cupy as cp +import numpy as np + +import cuda.compute +from cuda.compute import OpKind + + +def reduce_values(h_input): + dtype = np.int32 + h_init = np.array([0], dtype=dtype) + d_input = cp.asarray(h_input, dtype=dtype) + d_output = cp.empty(1, dtype=dtype) + + cuda.compute.reduce_into( + d_in=d_input, + d_out=d_output, + num_items=len(h_input), + op=OpKind.PLUS, + h_init=h_init, + ) + + return int(d_output.get()[0]) + + +inputs = [ + np.array([1, 2, 3, 4], dtype=np.int32), + np.array([5, 6, 7, 8], dtype=np.int32), +] + +with ThreadPoolExecutor(max_workers=len(inputs)) as executor: + results = list(executor.map(reduce_values, inputs)) + +expected = [int(np.sum(h_input)) for h_input in inputs] +assert results == expected +print(f"Free-threaded direct API results: {results}") diff --git a/python/cuda_cccl/tests/compute/examples/free_threading/object_api.py b/python/cuda_cccl/tests/compute/examples/free_threading/object_api.py new file mode 100644 index 00000000000..2b7ec1de3ba --- /dev/null +++ b/python/cuda_cccl/tests/compute/examples/free_threading/object_api.py @@ -0,0 +1,63 @@ +# Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +# example-begin +""" +Run independent object-based API calls from multiple Python threads. +""" + +from concurrent.futures import ThreadPoolExecutor + +import cupy as cp +import numpy as np + +import cuda.compute +from cuda.compute import OpKind + + +def reduce_values(h_input): + dtype = np.int32 + h_init = np.array([0], dtype=dtype) + d_input = cp.asarray(h_input, dtype=dtype) + d_output = cp.empty(1, dtype=dtype) + + reducer = cuda.compute.make_reduce_into( + d_in=d_input, + d_out=d_output, + op=OpKind.PLUS, + h_init=h_init, + ) + temp_storage_size = reducer( + temp_storage=None, + d_in=d_input, + d_out=d_output, + num_items=len(h_input), + op=OpKind.PLUS, + h_init=h_init, + ) + d_temp_storage = cp.empty(temp_storage_size, dtype=np.uint8) + + reducer( + temp_storage=d_temp_storage, + d_in=d_input, + d_out=d_output, + num_items=len(h_input), + op=OpKind.PLUS, + h_init=h_init, + ) + + return int(d_output.get()[0]) + + +inputs = [ + np.array([1, 2, 3, 4], dtype=np.int32), + np.array([5, 6, 7, 8], dtype=np.int32), +] + +with ThreadPoolExecutor(max_workers=len(inputs)) as executor: + results = list(executor.map(reduce_values, inputs)) + +expected = [int(np.sum(h_input)) for h_input in inputs] +assert results == expected +print(f"Free-threaded object API results: {results}") diff --git a/python/cuda_cccl/tests/compute/test_free_threading_stress.py b/python/cuda_cccl/tests/compute/test_free_threading_stress.py new file mode 100644 index 00000000000..1e40996fbe5 --- /dev/null +++ b/python/cuda_cccl/tests/compute/test_free_threading_stress.py @@ -0,0 +1,1445 @@ +# Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +from __future__ import annotations + +import concurrent.futures +import os +import sys +import sysconfig +import threading +from dataclasses import dataclass +from typing import Callable + +import numpy as np +import pytest + + +pytestmark = [ + pytest.mark.free_threading, + pytest.mark.no_numba, + pytest.mark.no_verify_sass( + reason="Free-threading stress tests intentionally run concurrent workers." + ), +] + +STRESS_ITERATIONS = int(os.environ.get("CCCL_FREE_THREADING_STRESS_ITERATIONS", "10")) +STRESS_THREADS = int(os.environ.get("CCCL_FREE_THREADING_STRESS_THREADS", "2")) +TRANSFORM_NATIVE_CACHE_THREADS = int( + os.environ.get( + "CCCL_FREE_THREADING_TRANSFORM_NATIVE_CACHE_THREADS", + str(max(STRESS_THREADS, 4)), + ) +) + + +def _is_free_threaded_build() -> bool: + return sysconfig.get_config_var("Py_GIL_DISABLED") in (1, "1") + + +def _assert_gil_disabled(where: str) -> None: + is_gil_enabled = getattr(sys, "_is_gil_enabled", None) + if is_gil_enabled is not None and is_gil_enabled(): + pytest.fail(f"the GIL is enabled {where}") + + +def _require_free_threaded_python() -> None: + if not _is_free_threaded_build(): + pytest.skip("requires a free-threaded CPython build") + _assert_gil_disabled("before importing cuda.compute") + + +@pytest.fixture +def compute_modules(): + _require_free_threaded_python() + + import cupy as cp + + _assert_gil_disabled("after importing cupy") + + import cuda.compute as cc + + _assert_gil_disabled("after importing cuda.compute") + cc.clear_all_caches() + try: + yield cp, cc + finally: + cc.clear_all_caches() + + +class _CudaStream: + def __init__(self, stream): + self.stream = stream + + def __cuda_stream__(self): + return (0, self.stream.ptr) + + @property + def ptr(self): + return self.stream.ptr + + +def _make_stream(cp): + stream = cp.cuda.Stream() + return stream, _CudaStream(stream) + + +def _run_threaded(workers: list[Callable[[threading.Barrier], None]]) -> None: + barrier = threading.Barrier(len(workers)) + with concurrent.futures.ThreadPoolExecutor(max_workers=len(workers)) as executor: + futures = [executor.submit(worker, barrier) for worker in workers] + for future in futures: + future.result() + _assert_gil_disabled("after concurrent cuda.compute operations") + + +def _call_with_temp(cp, algorithm, **kwargs): + temp_storage_bytes = algorithm(temp_storage=None, **kwargs) + temp_storage = cp.empty(temp_storage_bytes, dtype=np.uint8) + return algorithm(temp_storage=temp_storage, **kwargs) + + +def _get_build_result(algorithm): + if hasattr(algorithm, "build_result"): + return algorithm.build_result + if hasattr(algorithm, "partitioner"): + return _get_build_result(algorithm.partitioner) + raise AssertionError(f"{type(algorithm).__name__} does not expose a build result") + + +def _selected_segments(keys, values, starts, ends, descending=False): + out_keys = keys.copy() + out_values = values.copy() + for start, end in zip(starts, ends): + segment_keys = keys[start:end] + order = np.argsort(segment_keys, kind="stable") + if descending: + order = order[::-1] + out_keys[start:end] = segment_keys[order] + out_values[start:end] = values[start:end][order] + return out_keys, out_values + + +@dataclass(frozen=True) +class _AlgorithmCase: + name: str + make_shared: Callable + make_worker: Callable + run: Callable + check: Callable + + def __str__(self): + return self.name + + +def _run_thread_local_algorithm_case(cp, cc, case: _AlgorithmCase) -> None: + warm_algorithm = case.make_shared(cp, cc) + + warm_worker = case.make_worker(cp, cc, worker_id=0, iteration=-1) + case.run(cp, cc, warm_algorithm, warm_worker) + case.check(cp, cc, warm_worker) + + for iteration in range(STRESS_ITERATIONS): + worker_state = [ + case.make_worker(cp, cc, worker_id=worker_id, iteration=iteration) + for worker_id in range(STRESS_THREADS) + ] + returned_algorithms = [None] * STRESS_THREADS + + def make_thread(worker_id, worker): + def thread(barrier): + barrier.wait() + algorithm = case.make_shared(cp, cc) + returned_algorithms[worker_id] = algorithm + case.run(cp, cc, algorithm, worker) + case.check(cp, cc, worker) + + return thread + + _run_threaded( + [make_thread(worker_id, worker) for worker_id, worker in enumerate(worker_state)] + ) + + assert len({id(algorithm) for algorithm in returned_algorithms}) == len( + returned_algorithms + ) + assert len( + {id(_get_build_result(algorithm)) for algorithm in returned_algorithms} + ) == 1 + + +def _make_reduce_worker(cp, cc, worker_id, iteration): + stream, cuda_stream = _make_stream(cp) + h_in = np.arange(64, dtype=np.int32) + worker_id * 101 + iteration + h_init = np.array([7 + worker_id], dtype=np.int32) + with stream: + d_in = cp.asarray(h_in) + d_out = cp.empty(1, dtype=np.int32) + return { + "stream": stream, + "cuda_stream": cuda_stream, + "h_in": h_in, + "d_in": d_in, + "d_out": d_out, + "h_init": h_init, + } + + +def _make_reduce_shared(cp, cc): + worker = _make_reduce_worker(cp, cc, 0, -1) + return cc.make_reduce_into( + d_in=worker["d_in"], + d_out=worker["d_out"], + op=cc.OpKind.PLUS, + h_init=worker["h_init"], + ) + + +def _run_reduce(cp, cc, reducer, worker): + _call_with_temp( + cp, + reducer, + d_in=worker["d_in"], + d_out=worker["d_out"], + op=cc.OpKind.PLUS, + h_init=worker["h_init"], + num_items=worker["h_in"].size, + stream=worker["cuda_stream"], + ) + + +def _check_reduce(cp, cc, worker): + worker["stream"].synchronize() + expected = worker["h_in"].sum(dtype=np.int64) + int(worker["h_init"][0]) + assert int(worker["d_out"].get()[0]) == int(expected) + + +def _make_unary_worker(cp, cc, worker_id, iteration): + stream, cuda_stream = _make_stream(cp) + h_in = np.arange(32, dtype=np.int32) + worker_id * 17 + iteration + with stream: + d_in = cp.asarray(h_in) + d_out = cp.empty_like(d_in) + return { + "stream": stream, + "cuda_stream": cuda_stream, + "h_in": h_in, + "d_in": d_in, + "d_out": d_out, + } + + +def _make_unary_shared(cp, cc): + worker = _make_unary_worker(cp, cc, 0, -1) + return cc.make_unary_transform( + d_in=worker["d_in"], d_out=worker["d_out"], op=cc.OpKind.NEGATE + ) + + +def _make_unary_for_worker(cp, cc, worker): + return cc.make_unary_transform( + d_in=worker["d_in"], d_out=worker["d_out"], op=cc.OpKind.NEGATE + ) + + +def _run_unary(cp, cc, transformer, worker): + transformer( + d_in=worker["d_in"], + d_out=worker["d_out"], + op=cc.OpKind.NEGATE, + num_items=worker["h_in"].size, + stream=worker["cuda_stream"], + ) + + +def _check_unary(cp, cc, worker): + worker["stream"].synchronize() + np.testing.assert_array_equal(worker["d_out"].get(), -worker["h_in"]) + + +def _make_binary_worker(cp, cc, worker_id, iteration): + stream, cuda_stream = _make_stream(cp) + h_in1 = np.arange(32, dtype=np.int32) + worker_id * 13 + h_in2 = np.arange(32, dtype=np.int32) + iteration * 7 + with stream: + d_in1 = cp.asarray(h_in1) + d_in2 = cp.asarray(h_in2) + d_out = cp.empty_like(d_in1) + return { + "stream": stream, + "cuda_stream": cuda_stream, + "h_in1": h_in1, + "h_in2": h_in2, + "d_in1": d_in1, + "d_in2": d_in2, + "d_out": d_out, + } + + +def _make_binary_shared(cp, cc): + worker = _make_binary_worker(cp, cc, 0, -1) + return cc.make_binary_transform( + d_in1=worker["d_in1"], + d_in2=worker["d_in2"], + d_out=worker["d_out"], + op=cc.OpKind.PLUS, + ) + + +def _make_binary_for_worker(cp, cc, worker): + return cc.make_binary_transform( + d_in1=worker["d_in1"], + d_in2=worker["d_in2"], + d_out=worker["d_out"], + op=cc.OpKind.PLUS, + ) + + +def _run_binary(cp, cc, transformer, worker): + transformer( + d_in1=worker["d_in1"], + d_in2=worker["d_in2"], + d_out=worker["d_out"], + op=cc.OpKind.PLUS, + num_items=worker["h_in1"].size, + stream=worker["cuda_stream"], + ) + + +def _check_binary(cp, cc, worker): + worker["stream"].synchronize() + np.testing.assert_array_equal(worker["d_out"].get(), worker["h_in1"] + worker["h_in2"]) + + +def _make_scan_worker(cp, cc, worker_id, iteration): + stream, cuda_stream = _make_stream(cp) + h_in = np.arange(1, 33, dtype=np.int32) + worker_id + iteration + h_init = np.array([3 + worker_id], dtype=np.int32) + with stream: + d_in = cp.asarray(h_in) + d_out = cp.empty_like(d_in) + return { + "stream": stream, + "cuda_stream": cuda_stream, + "h_in": h_in, + "h_init": h_init, + "d_in": d_in, + "d_out": d_out, + } + + +def _make_exclusive_scan_shared(cp, cc): + worker = _make_scan_worker(cp, cc, 0, -1) + return cc.make_exclusive_scan( + d_in=worker["d_in"], + d_out=worker["d_out"], + op=cc.OpKind.PLUS, + init_value=worker["h_init"], + ) + + +def _make_inclusive_scan_shared(cp, cc): + worker = _make_scan_worker(cp, cc, 0, -1) + return cc.make_inclusive_scan( + d_in=worker["d_in"], + d_out=worker["d_out"], + op=cc.OpKind.PLUS, + init_value=worker["h_init"], + ) + + +def _run_scan(cp, cc, scanner, worker): + _call_with_temp( + cp, + scanner, + d_in=worker["d_in"], + d_out=worker["d_out"], + op=cc.OpKind.PLUS, + init_value=worker["h_init"], + num_items=worker["h_in"].size, + stream=worker["cuda_stream"], + ) + + +def _check_exclusive_scan(cp, cc, worker): + worker["stream"].synchronize() + expected = np.empty_like(worker["h_in"]) + expected[0] = worker["h_init"][0] + expected[1:] = worker["h_init"][0] + np.cumsum(worker["h_in"][:-1]) + np.testing.assert_array_equal(worker["d_out"].get(), expected) + + +def _check_inclusive_scan(cp, cc, worker): + worker["stream"].synchronize() + expected = worker["h_init"][0] + np.cumsum(worker["h_in"]) + np.testing.assert_array_equal(worker["d_out"].get(), expected) + + +def _make_segmented_reduce_worker(cp, cc, worker_id, iteration): + stream, cuda_stream = _make_stream(cp) + h_in = np.arange(1, 17, dtype=np.int32) + worker_id * 3 + iteration + h_start_offsets = np.array([0, 3, 8, 12], dtype=np.int32) + h_end_offsets = np.array([3, 8, 12, 16], dtype=np.int32) + h_init = np.array([worker_id], dtype=np.int32) + with stream: + d_in = cp.asarray(h_in) + d_out = cp.empty(len(h_start_offsets), dtype=np.int32) + d_start_offsets = cp.asarray(h_start_offsets) + d_end_offsets = cp.asarray(h_end_offsets) + return { + "stream": stream, + "cuda_stream": cuda_stream, + "h_in": h_in, + "h_start_offsets": h_start_offsets, + "h_end_offsets": h_end_offsets, + "h_init": h_init, + "d_in": d_in, + "d_out": d_out, + "d_start_offsets": d_start_offsets, + "d_end_offsets": d_end_offsets, + } + + +def _make_segmented_reduce_shared(cp, cc): + worker = _make_segmented_reduce_worker(cp, cc, 0, -1) + return cc.make_segmented_reduce( + d_in=worker["d_in"], + d_out=worker["d_out"], + start_offsets_in=worker["d_start_offsets"], + end_offsets_in=worker["d_end_offsets"], + op=cc.OpKind.PLUS, + h_init=worker["h_init"], + ) + + +def _run_segmented_reduce(cp, cc, reducer, worker): + _call_with_temp( + cp, + reducer, + d_in=worker["d_in"], + d_out=worker["d_out"], + num_segments=len(worker["h_start_offsets"]), + start_offsets_in=worker["d_start_offsets"], + end_offsets_in=worker["d_end_offsets"], + op=cc.OpKind.PLUS, + h_init=worker["h_init"], + stream=worker["cuda_stream"], + ) + + +def _check_segmented_reduce(cp, cc, worker): + worker["stream"].synchronize() + expected = np.array( + [ + worker["h_in"][start:end].sum() + worker["h_init"][0] + for start, end in zip(worker["h_start_offsets"], worker["h_end_offsets"]) + ], + dtype=np.int32, + ) + np.testing.assert_array_equal(worker["d_out"].get(), expected) + + +def _make_histogram_worker(cp, cc, worker_id, iteration): + stream, cuda_stream = _make_stream(cp) + lower = np.float32(worker_id * 10) + upper = np.float32(lower + 8) + h_samples = np.array( + [ + lower + 0.5, + lower + 1.5, + lower + 2.0, + lower + 3.5, + lower + 6.0, + upper + 1.0, + ], + dtype=np.float32, + ) + h_num_levels = np.array([5], dtype=np.int32) + h_lower = np.array([lower], dtype=np.float32) + h_upper = np.array([upper], dtype=np.float32) + with stream: + d_samples = cp.asarray(h_samples) + d_histogram = cp.zeros(h_num_levels[0] - 1, dtype=np.int32) + return { + "stream": stream, + "cuda_stream": cuda_stream, + "h_samples": h_samples, + "h_num_levels": h_num_levels, + "h_lower": h_lower, + "h_upper": h_upper, + "d_samples": d_samples, + "d_histogram": d_histogram, + } + + +def _make_histogram_shared(cp, cc): + worker = _make_histogram_worker(cp, cc, 0, -1) + return cc.make_histogram_even( + d_samples=worker["d_samples"], + d_histogram=worker["d_histogram"], + h_num_output_levels=worker["h_num_levels"], + h_lower_level=worker["h_lower"], + h_upper_level=worker["h_upper"], + num_samples=worker["h_samples"].size, + ) + + +def _run_histogram(cp, cc, histogrammer, worker): + with worker["stream"]: + worker["d_histogram"].fill(0) + _call_with_temp( + cp, + histogrammer, + d_samples=worker["d_samples"], + d_histogram=worker["d_histogram"], + h_num_output_levels=worker["h_num_levels"], + h_lower_level=worker["h_lower"], + h_upper_level=worker["h_upper"], + num_samples=worker["h_samples"].size, + stream=worker["cuda_stream"], + ) + + +def _check_histogram(cp, cc, worker): + worker["stream"].synchronize() + expected, _ = np.histogram( + worker["h_samples"], + bins=int(worker["h_num_levels"][0] - 1), + range=(float(worker["h_lower"][0]), float(worker["h_upper"][0])), + ) + np.testing.assert_array_equal(worker["d_histogram"].get(), expected.astype(np.int32)) + + +def _make_binary_search_worker(cp, cc, worker_id, iteration): + stream, cuda_stream = _make_stream(cp) + h_data = np.array([90, 70, 50, 30, 10], dtype=np.int32) - worker_id + h_values = np.array([95, 70, 45, 10, 5], dtype=np.int32) - worker_id + with stream: + d_data = cp.asarray(h_data) + d_values = cp.asarray(h_values) + d_out = cp.empty(h_values.size, dtype=np.uintp) + return { + "stream": stream, + "cuda_stream": cuda_stream, + "h_data": h_data, + "h_values": h_values, + "d_data": d_data, + "d_values": d_values, + "d_out": d_out, + } + + +def _make_lower_bound_shared(cp, cc): + worker = _make_binary_search_worker(cp, cc, 0, -1) + return cc.make_lower_bound( + d_data=worker["d_data"], + d_values=worker["d_values"], + d_out=worker["d_out"], + comp=cc.OpKind.GREATER, + ) + + +def _make_upper_bound_shared(cp, cc): + worker = _make_binary_search_worker(cp, cc, 0, -1) + return cc.make_upper_bound( + d_data=worker["d_data"], + d_values=worker["d_values"], + d_out=worker["d_out"], + comp=cc.OpKind.GREATER, + ) + + +def _run_binary_search(cp, cc, searcher, worker): + searcher( + d_data=worker["d_data"], + num_items=worker["h_data"].size, + d_values=worker["d_values"], + num_values=worker["h_values"].size, + d_out=worker["d_out"], + comp=cc.OpKind.GREATER, + stream=worker["cuda_stream"], + ) + + +def _check_lower_bound(cp, cc, worker): + worker["stream"].synchronize() + expected = np.searchsorted(-worker["h_data"], -worker["h_values"], side="left") + np.testing.assert_array_equal(worker["d_out"].get(), expected.astype(np.uintp)) + + +def _check_upper_bound(cp, cc, worker): + worker["stream"].synchronize() + expected = np.searchsorted(-worker["h_data"], -worker["h_values"], side="right") + np.testing.assert_array_equal(worker["d_out"].get(), expected.astype(np.uintp)) + + +def _make_select_worker(cp, cc, worker_id, iteration): + stream, cuda_stream = _make_stream(cp) + h_in = np.array( + [True, False, worker_id % 2 == 0, True, False, iteration % 2 == 0], + dtype=np.bool_, + ) + with stream: + d_in = cp.asarray(h_in) + d_out = cp.empty_like(d_in) + d_count = cp.empty(2, dtype=np.uint64) + return { + "stream": stream, + "cuda_stream": cuda_stream, + "h_in": h_in, + "d_in": d_in, + "d_out": d_out, + "d_count": d_count, + } + + +def _make_select_shared(cp, cc): + worker = _make_select_worker(cp, cc, 0, -1) + return cc.make_select( + d_in=worker["d_in"], + d_out=worker["d_out"], + d_num_selected_out=worker["d_count"], + cond=cc.OpKind.IDENTITY, + ) + + +def _run_select(cp, cc, selector, worker): + _call_with_temp( + cp, + selector, + d_in=worker["d_in"], + d_out=worker["d_out"], + d_num_selected_out=worker["d_count"], + cond=cc.OpKind.IDENTITY, + num_items=worker["h_in"].size, + stream=worker["cuda_stream"], + ) + + +def _check_select(cp, cc, worker): + worker["stream"].synchronize() + count = int(worker["d_count"].get()[0]) + expected = worker["h_in"][worker["h_in"]] + assert count == expected.size + np.testing.assert_array_equal(worker["d_out"].get()[:count], expected) + + +def _make_three_way_shared(cp, cc): + worker = _make_select_worker(cp, cc, 0, -1) + d_unselected = cp.empty_like(worker["d_in"]) + return cc.make_three_way_partition( + d_in=worker["d_in"], + d_first_part_out=worker["d_out"], + d_second_part_out=d_unselected, + d_unselected_out=cp.empty_like(worker["d_in"]), + d_num_selected_out=worker["d_count"], + select_first_part_op=cc.OpKind.IDENTITY, + select_second_part_op=cc.OpKind.LOGICAL_NOT, + ) + + +def _make_three_way_worker(cp, cc, worker_id, iteration): + worker = _make_select_worker(cp, cc, worker_id, iteration) + stream = worker["stream"] + with stream: + worker["d_second_out"] = cp.empty_like(worker["d_in"]) + worker["d_unselected"] = cp.empty_like(worker["d_in"]) + return worker + + +def _run_three_way(cp, cc, partitioner, worker): + _call_with_temp( + cp, + partitioner, + d_in=worker["d_in"], + d_first_part_out=worker["d_out"], + d_second_part_out=worker["d_second_out"], + d_unselected_out=worker["d_unselected"], + d_num_selected_out=worker["d_count"], + select_first_part_op=cc.OpKind.IDENTITY, + select_second_part_op=cc.OpKind.LOGICAL_NOT, + num_items=worker["h_in"].size, + stream=worker["cuda_stream"], + ) + + +def _check_three_way(cp, cc, worker): + worker["stream"].synchronize() + counts = worker["d_count"].get() + true_count = int(np.count_nonzero(worker["h_in"])) + false_count = int(worker["h_in"].size - true_count) + assert int(counts[0]) == true_count + assert int(counts[1]) == false_count + np.testing.assert_array_equal( + worker["d_out"].get()[:true_count], np.ones(true_count, dtype=np.bool_) + ) + np.testing.assert_array_equal( + worker["d_second_out"].get()[:false_count], np.zeros(false_count, dtype=np.bool_) + ) + + +def _make_unique_worker(cp, cc, worker_id, iteration): + stream, cuda_stream = _make_stream(cp) + base = worker_id * 10 + iteration + h_keys = np.array([base, base, base + 1, base + 2, base + 2, base + 3], dtype=np.int32) + h_items = np.arange(h_keys.size, dtype=np.int32) + worker_id * 100 + with stream: + d_in_keys = cp.asarray(h_keys) + d_in_items = cp.asarray(h_items) + d_out_keys = cp.empty_like(d_in_keys) + d_out_items = cp.empty_like(d_in_items) + d_count = cp.empty(1, dtype=np.int32) + return { + "stream": stream, + "cuda_stream": cuda_stream, + "h_keys": h_keys, + "h_items": h_items, + "d_in_keys": d_in_keys, + "d_in_items": d_in_items, + "d_out_keys": d_out_keys, + "d_out_items": d_out_items, + "d_count": d_count, + } + + +def _make_unique_shared(cp, cc): + worker = _make_unique_worker(cp, cc, 0, -1) + return cc.make_unique_by_key( + d_in_keys=worker["d_in_keys"], + d_in_items=worker["d_in_items"], + d_out_keys=worker["d_out_keys"], + d_out_items=worker["d_out_items"], + d_out_num_selected=worker["d_count"], + op=cc.OpKind.EQUAL_TO, + ) + + +def _run_unique(cp, cc, uniquer, worker): + _call_with_temp( + cp, + uniquer, + d_in_keys=worker["d_in_keys"], + d_in_items=worker["d_in_items"], + d_out_keys=worker["d_out_keys"], + d_out_items=worker["d_out_items"], + d_out_num_selected=worker["d_count"], + op=cc.OpKind.EQUAL_TO, + num_items=worker["h_keys"].size, + stream=worker["cuda_stream"], + ) + + +def _check_unique(cp, cc, worker): + worker["stream"].synchronize() + selected = np.concatenate(([True], worker["h_keys"][1:] != worker["h_keys"][:-1])) + expected_keys = worker["h_keys"][selected] + expected_items = worker["h_items"][selected] + count = int(worker["d_count"].get()[0]) + assert count == expected_keys.size + np.testing.assert_array_equal(worker["d_out_keys"].get()[:count], expected_keys) + np.testing.assert_array_equal(worker["d_out_items"].get()[:count], expected_items) + + +def _make_merge_sort_worker(cp, cc, worker_id, iteration): + stream, cuda_stream = _make_stream(cp) + h_keys = np.array([5, 1, 3, 1, 4, 2], dtype=np.int32) + worker_id * 10 + h_values = np.arange(h_keys.size, dtype=np.int32) + iteration * 100 + with stream: + d_in_keys = cp.asarray(h_keys) + d_in_values = cp.asarray(h_values) + d_out_keys = cp.empty_like(d_in_keys) + d_out_values = cp.empty_like(d_in_values) + return { + "stream": stream, + "cuda_stream": cuda_stream, + "h_keys": h_keys, + "h_values": h_values, + "d_in_keys": d_in_keys, + "d_in_values": d_in_values, + "d_out_keys": d_out_keys, + "d_out_values": d_out_values, + } + + +def _make_merge_sort_shared(cp, cc): + worker = _make_merge_sort_worker(cp, cc, 0, -1) + return cc.make_merge_sort( + d_in_keys=worker["d_in_keys"], + d_in_values=worker["d_in_values"], + d_out_keys=worker["d_out_keys"], + d_out_values=worker["d_out_values"], + op=cc.OpKind.LESS, + ) + + +def _run_merge_sort(cp, cc, sorter, worker): + _call_with_temp( + cp, + sorter, + d_in_keys=worker["d_in_keys"], + d_in_values=worker["d_in_values"], + d_out_keys=worker["d_out_keys"], + d_out_values=worker["d_out_values"], + op=cc.OpKind.LESS, + num_items=worker["h_keys"].size, + stream=worker["cuda_stream"], + ) + + +def _check_merge_sort(cp, cc, worker): + worker["stream"].synchronize() + order = np.argsort(worker["h_keys"], kind="stable") + np.testing.assert_array_equal(worker["d_out_keys"].get(), worker["h_keys"][order]) + np.testing.assert_array_equal(worker["d_out_values"].get(), worker["h_values"][order]) + + +def _make_radix_sort_worker(cp, cc, worker_id, iteration): + stream, cuda_stream = _make_stream(cp) + h_keys = np.array([7, 3, 5, 3, 1, 9], dtype=np.uint32) + np.uint32(worker_id * 11) + h_values = np.arange(h_keys.size, dtype=np.int32) + iteration * 10 + with stream: + d_in_keys = cp.asarray(h_keys) + d_tmp_keys = cp.empty_like(d_in_keys) + d_in_values = cp.asarray(h_values) + d_tmp_values = cp.empty_like(d_in_values) + return { + "stream": stream, + "cuda_stream": cuda_stream, + "h_keys": h_keys, + "h_values": h_values, + "keys": cc.DoubleBuffer(d_in_keys, d_tmp_keys), + "values": cc.DoubleBuffer(d_in_values, d_tmp_values), + } + + +def _make_radix_sort_shared(cp, cc): + worker = _make_radix_sort_worker(cp, cc, 0, -1) + return cc.make_radix_sort( + d_in_keys=worker["keys"], + d_out_keys=None, + d_in_values=worker["values"], + d_out_values=None, + order=cc.SortOrder.ASCENDING, + ) + + +def _run_radix_sort(cp, cc, sorter, worker): + _call_with_temp( + cp, + sorter, + d_in_keys=worker["keys"], + d_out_keys=None, + d_in_values=worker["values"], + d_out_values=None, + num_items=worker["h_keys"].size, + stream=worker["cuda_stream"], + ) + + +def _check_radix_sort(cp, cc, worker): + worker["stream"].synchronize() + order = np.argsort(worker["h_keys"], kind="stable") + np.testing.assert_array_equal(worker["keys"].current().get(), worker["h_keys"][order]) + np.testing.assert_array_equal(worker["values"].current().get(), worker["h_values"][order]) + assert worker["keys"].selector == worker["values"].selector + + +def _make_segmented_sort_worker(cp, cc, worker_id, iteration): + stream, cuda_stream = _make_stream(cp) + h_keys = np.array([4, 2, 3, 8, 6, 7, 1, 5], dtype=np.int32) + worker_id * 13 + h_values = np.arange(h_keys.size, dtype=np.int32) + iteration * 100 + h_start_offsets = np.array([0, 3, 6], dtype=np.int32) + h_end_offsets = np.array([3, 6, 8], dtype=np.int32) + with stream: + d_in_keys = cp.asarray(h_keys) + d_tmp_keys = cp.empty_like(d_in_keys) + d_in_values = cp.asarray(h_values) + d_tmp_values = cp.empty_like(d_in_values) + d_start_offsets = cp.asarray(h_start_offsets) + d_end_offsets = cp.asarray(h_end_offsets) + return { + "stream": stream, + "cuda_stream": cuda_stream, + "h_keys": h_keys, + "h_values": h_values, + "h_start_offsets": h_start_offsets, + "h_end_offsets": h_end_offsets, + "keys": cc.DoubleBuffer(d_in_keys, d_tmp_keys), + "values": cc.DoubleBuffer(d_in_values, d_tmp_values), + "d_start_offsets": d_start_offsets, + "d_end_offsets": d_end_offsets, + } + + +def _make_segmented_sort_shared(cp, cc): + worker = _make_segmented_sort_worker(cp, cc, 0, -1) + return cc.make_segmented_sort( + d_in_keys=worker["keys"], + d_out_keys=None, + d_in_values=worker["values"], + d_out_values=None, + start_offsets_in=worker["d_start_offsets"], + end_offsets_in=worker["d_end_offsets"], + order=cc.SortOrder.ASCENDING, + ) + + +def _run_segmented_sort(cp, cc, sorter, worker): + _call_with_temp( + cp, + sorter, + d_in_keys=worker["keys"], + d_out_keys=None, + d_in_values=worker["values"], + d_out_values=None, + num_items=worker["h_keys"].size, + num_segments=worker["h_start_offsets"].size, + start_offsets_in=worker["d_start_offsets"], + end_offsets_in=worker["d_end_offsets"], + stream=worker["cuda_stream"], + ) + + +def _check_segmented_sort(cp, cc, worker): + worker["stream"].synchronize() + expected_keys, expected_values = _selected_segments( + worker["h_keys"], + worker["h_values"], + worker["h_start_offsets"], + worker["h_end_offsets"], + ) + np.testing.assert_array_equal(worker["keys"].current().get(), expected_keys) + np.testing.assert_array_equal(worker["values"].current().get(), expected_values) + assert worker["keys"].selector == worker["values"].selector + + +SHARED_ALGORITHM_CASES = [ + _AlgorithmCase("reduce", _make_reduce_shared, _make_reduce_worker, _run_reduce, _check_reduce), + _AlgorithmCase( + "unary_transform", _make_unary_shared, _make_unary_worker, _run_unary, _check_unary + ), + _AlgorithmCase( + "binary_transform", + _make_binary_shared, + _make_binary_worker, + _run_binary, + _check_binary, + ), + _AlgorithmCase( + "exclusive_scan", + _make_exclusive_scan_shared, + _make_scan_worker, + _run_scan, + _check_exclusive_scan, + ), + _AlgorithmCase( + "inclusive_scan", + _make_inclusive_scan_shared, + _make_scan_worker, + _run_scan, + _check_inclusive_scan, + ), + _AlgorithmCase( + "segmented_reduce", + _make_segmented_reduce_shared, + _make_segmented_reduce_worker, + _run_segmented_reduce, + _check_segmented_reduce, + ), + _AlgorithmCase( + "histogram", + _make_histogram_shared, + _make_histogram_worker, + _run_histogram, + _check_histogram, + ), + _AlgorithmCase( + "lower_bound", + _make_lower_bound_shared, + _make_binary_search_worker, + _run_binary_search, + _check_lower_bound, + ), + _AlgorithmCase( + "upper_bound", + _make_upper_bound_shared, + _make_binary_search_worker, + _run_binary_search, + _check_upper_bound, + ), + _AlgorithmCase("select", _make_select_shared, _make_select_worker, _run_select, _check_select), + _AlgorithmCase( + "three_way_partition", + _make_three_way_shared, + _make_three_way_worker, + _run_three_way, + _check_three_way, + ), + _AlgorithmCase( + "unique_by_key", _make_unique_shared, _make_unique_worker, _run_unique, _check_unique + ), + _AlgorithmCase( + "merge_sort", + _make_merge_sort_shared, + _make_merge_sort_worker, + _run_merge_sort, + _check_merge_sort, + ), + _AlgorithmCase( + "radix_sort", + _make_radix_sort_shared, + _make_radix_sort_worker, + _run_radix_sort, + _check_radix_sort, + ), + _AlgorithmCase( + "segmented_sort", + _make_segmented_sort_shared, + _make_segmented_sort_worker, + _run_segmented_sort, + _check_segmented_sort, + ), +] + + +def test_free_threaded_import_keeps_gil_disabled(compute_modules): + cp, cc = compute_modules + + h_in = np.arange(8, dtype=np.int32) + d_in = cp.asarray(h_in) + d_out = cp.empty(1, dtype=np.int32) + h_init = np.array([0], dtype=np.int32) + + cc.reduce_into( + d_in=d_in, + d_out=d_out, + num_items=h_in.size, + op=cc.OpKind.PLUS, + h_init=h_init, + ) + + assert int(d_out.get()[0]) == int(h_in.sum()) + _assert_gil_disabled("after running cuda.compute smoke operation") + + +@pytest.mark.parametrize("case", SHARED_ALGORITHM_CASES, ids=str) +def test_thread_local_algorithm_objects_share_build_result(compute_modules, case): + cp, cc = compute_modules + + _run_thread_local_algorithm_case(cp, cc, case) + + +def _cache_miss_reduce(cp, cc, worker_id, iteration): + worker = _make_reduce_worker(cp, cc, worker_id, iteration) + reducer = cc.make_reduce_into( + d_in=worker["d_in"], + d_out=worker["d_out"], + op=cc.OpKind.PLUS, + h_init=worker["h_init"], + ) + _run_reduce(cp, cc, reducer, worker) + _check_reduce(cp, cc, worker) + return reducer + + +def _cache_miss_unary_transform(cp, cc, worker_id, iteration): + worker = _make_unary_worker(cp, cc, worker_id, iteration) + transformer = cc.make_unary_transform( + d_in=worker["d_in"], d_out=worker["d_out"], op=cc.OpKind.NEGATE + ) + _run_unary(cp, cc, transformer, worker) + _check_unary(cp, cc, worker) + return transformer + + +def _cache_miss_binary_transform(cp, cc, worker_id, iteration): + worker = _make_binary_worker(cp, cc, worker_id, iteration) + transformer = cc.make_binary_transform( + d_in1=worker["d_in1"], + d_in2=worker["d_in2"], + d_out=worker["d_out"], + op=cc.OpKind.PLUS, + ) + _run_binary(cp, cc, transformer, worker) + _check_binary(cp, cc, worker) + return transformer + + +@pytest.mark.parametrize( + "factory", + [_cache_miss_reduce, _cache_miss_unary_transform, _cache_miss_binary_transform], + ids=["reduce", "unary_transform", "binary_transform"], +) +def test_same_key_factory_cache_miss_storm(compute_modules, factory): + cp, cc = compute_modules + + for iteration in range(STRESS_ITERATIONS): + cc.clear_all_caches() + returned_objects = [None] * STRESS_THREADS + + def make_thread(worker_id): + def thread(barrier): + barrier.wait() + returned_objects[worker_id] = factory(cp, cc, worker_id, iteration) + + return thread + + _run_threaded([make_thread(worker_id) for worker_id in range(STRESS_THREADS)]) + + assert len({id(obj) for obj in returned_objects}) == len(returned_objects) + assert len({id(_get_build_result(obj)) for obj in returned_objects}) == 1 + + +def test_shared_raw_op_object_direct_algorithm_stress(compute_modules): + cp, cc = compute_modules + + from cuda.compute._cpp_compile import compile_cpp_op_code + from cuda.compute.op import RawOp + + source = """ + extern "C" __device__ void raw_add_i32(void* a, void* b, void* result) { + *static_cast(result) = *static_cast(a) + *static_cast(b); + } + """ + shared_op = RawOp(ltoir=compile_cpp_op_code(source), name="raw_add_i32") + + for iteration in range(STRESS_ITERATIONS): + cc.clear_all_caches() + returned_reducers = [None] * STRESS_THREADS + + def make_thread(worker_id): + stream, cuda_stream = _make_stream(cp) + h_in = np.arange(32, dtype=np.int32) + worker_id * 31 + iteration + h_init = np.array([worker_id + 5], dtype=np.int32) + with stream: + d_in = cp.asarray(h_in) + d_out = cp.empty(1, dtype=np.int32) + + def thread(barrier): + barrier.wait() + reducer = cc.make_reduce_into( + d_in=d_in, + d_out=d_out, + op=shared_op, + h_init=h_init, + ) + returned_reducers[worker_id] = reducer + _call_with_temp( + cp, + reducer, + d_in=d_in, + d_out=d_out, + op=shared_op, + h_init=h_init, + num_items=h_in.size, + stream=cuda_stream, + ) + stream.synchronize() + expected = int(h_in.sum(dtype=np.int64) + h_init[0]) + assert int(d_out.get()[0]) == expected + + return thread + + _run_threaded([make_thread(worker_id) for worker_id in range(STRESS_THREADS)]) + + assert len({id(reducer) for reducer in returned_reducers}) == len( + returned_reducers + ) + assert len({id(_get_build_result(reducer)) for reducer in returned_reducers}) == 1 + + +@dataclass(frozen=True) +class _IteratorCase: + name: str + make_iterator: Callable + dtype: np.dtype + num_items: int + expected_sum: int + + def __str__(self): + return self.name + + +@dataclass(frozen=True) +class _ColdTransformCase: + name: str + make_worker: Callable + make_transformer: Callable + run: Callable + check: Callable + + def __str__(self): + return self.name + + +def _run_cold_transform_native_cache_case(cp, cc, case: _ColdTransformCase) -> None: + for iteration in range(STRESS_ITERATIONS): + cc.clear_all_caches() + workers = [ + case.make_worker(cp, cc, worker_id=worker_id, iteration=iteration) + for worker_id in range(TRANSFORM_NATIVE_CACHE_THREADS) + ] + returned_algorithms = [None] * TRANSFORM_NATIVE_CACHE_THREADS + # Transform's native launch config cache is filled on first execution, + # so build wrappers first and synchronize the first call separately. + execute_barrier = threading.Barrier(TRANSFORM_NATIVE_CACHE_THREADS) + + def make_thread(worker_id, worker): + def thread(barrier): + barrier.wait() + try: + algorithm = case.make_transformer(cp, cc, worker) + returned_algorithms[worker_id] = algorithm + except BaseException: + execute_barrier.abort() + raise + + execute_barrier.wait(timeout=60) + case.run(cp, cc, algorithm, worker) + case.check(cp, cc, worker) + + return thread + + _run_threaded( + [make_thread(worker_id, worker) for worker_id, worker in enumerate(workers)] + ) + + assert len({id(algorithm) for algorithm in returned_algorithms}) == len( + returned_algorithms + ) + assert len( + {id(_get_build_result(algorithm)) for algorithm in returned_algorithms} + ) == 1 + + +@pytest.mark.parametrize( + "case", + [ + _ColdTransformCase( + "unary_transform", + _make_unary_worker, + _make_unary_for_worker, + _run_unary, + _check_unary, + ), + _ColdTransformCase( + "binary_transform", + _make_binary_worker, + _make_binary_for_worker, + _run_binary, + _check_binary, + ), + ], + ids=str, +) +def test_cold_transform_native_cache_initialization_stress(compute_modules, case): + cp, cc = compute_modules + + _run_cold_transform_native_cache_case(cp, cc, case) + + +def _iterator_counting(cp, cc): + return cc.CountingIterator(np.int32(0)), np.dtype(np.int32), 32, sum(range(32)) + + +def _iterator_constant(cp, cc): + return cc.ConstantIterator(np.int32(5)), np.dtype(np.int32), 32, 32 * 5 + + +def _iterator_cache_modified(cp, cc): + h_in = np.arange(32, dtype=np.int32) + d_in = cp.asarray(h_in) + return cc.CacheModifiedInputIterator(d_in, "stream"), h_in.dtype, h_in.size, int(h_in.sum()) + + +def _iterator_reverse(cp, cc): + h_in = np.arange(32, dtype=np.int32) + d_in = cp.asarray(h_in) + return cc.ReverseIterator(d_in), h_in.dtype, h_in.size, int(h_in.sum()) + + +def _iterator_permutation(cp, cc): + h_values = np.arange(32, dtype=np.int32) + h_indices = np.arange(31, -1, -1, dtype=np.int32) + d_values = cp.asarray(h_values) + d_indices = cp.asarray(h_indices) + return ( + cc.PermutationIterator(d_values, d_indices), + h_values.dtype, + h_indices.size, + int(h_values[h_indices].sum()), + ) + + +def _iterator_shuffle(cp, cc): + num_items = 32 + return ( + cc.ShuffleIterator(num_items, seed=1234), + np.dtype(np.int64), + num_items, + sum(range(num_items)), + ) + + +def _iterator_transform(cp, cc): + from cuda.compute import types + from cuda.compute._cpp_compile import compile_cpp_op_code + from cuda.compute.op import RawOp + + num_items = 32 + source = """ + extern "C" __device__ void negate_i32(void* input, void* result) { + *static_cast(result) = -*static_cast(input); + } + """ + op = RawOp(ltoir=compile_cpp_op_code(source), name="negate_i32") + return ( + cc.TransformIterator(cc.CountingIterator(np.int32(0)), op, value_type=types.int32), + np.dtype(np.int32), + num_items, + -sum(range(num_items)), + ) + + +ITERATOR_FACTORIES = [ + _iterator_counting, + _iterator_constant, + _iterator_cache_modified, + _iterator_reverse, + _iterator_permutation, + _iterator_shuffle, + _iterator_transform, +] + + +@pytest.mark.parametrize( + "make_iterator", + ITERATOR_FACTORIES, + ids=lambda fn: fn.__name__.removeprefix("_iterator_"), +) +def test_shared_iterator_object_stress(compute_modules, make_iterator): + cp, cc = compute_modules + + shared_iterator, dtype, num_items, expected_sum = make_iterator(cp, cc) + cp.cuda.Device().synchronize() + + for iteration in range(STRESS_ITERATIONS): + cc.clear_all_caches() + + def make_thread(worker_id): + stream, cuda_stream = _make_stream(cp) + h_init = np.array([worker_id], dtype=dtype) + with stream: + d_out = cp.empty(1, dtype=dtype) + + def thread(barrier): + barrier.wait() + reducer = cc.make_reduce_into( + d_in=shared_iterator, + d_out=d_out, + op=cc.OpKind.PLUS, + h_init=h_init, + ) + _call_with_temp( + cp, + reducer, + d_in=shared_iterator, + d_out=d_out, + op=cc.OpKind.PLUS, + h_init=h_init, + num_items=num_items, + stream=cuda_stream, + ) + stream.synchronize() + assert int(d_out.get()[0]) == int(expected_sum + h_init[0]) + + return thread + + _run_threaded([make_thread(worker_id) for worker_id in range(STRESS_THREADS)]) + + +def test_runtime_ownership_isolation(compute_modules): + cp, cc = compute_modules + + def make_thread(worker_id): + def thread(barrier): + barrier.wait() + stream, cuda_stream = _make_stream(cp) + h_in = np.arange(16, dtype=np.int32) + worker_id * 10 + h_init = np.array([worker_id], dtype=np.int32) + + with stream: + d_in = cp.asarray(h_in) + d_reduce_out = cp.empty(1, dtype=np.int32) + d_scan_out = cp.empty_like(d_in) + d_transform_out = cp.empty_like(d_in) + d_hist = cp.zeros(4, dtype=np.int32) + h_keys = np.array([3, 1, 2, 1], dtype=np.uint32) + worker_id + d_keys_in = cp.asarray(h_keys) + d_keys_tmp = cp.empty_like(d_keys_in) + + cc.reduce_into( + d_in=d_in, + d_out=d_reduce_out, + num_items=h_in.size, + op=cc.OpKind.PLUS, + h_init=h_init, + stream=cuda_stream, + ) + cc.exclusive_scan( + d_in=d_in, + d_out=d_scan_out, + op=cc.OpKind.PLUS, + init_value=h_init, + num_items=h_in.size, + stream=cuda_stream, + ) + cc.unary_transform( + d_in=d_in, + d_out=d_transform_out, + op=cc.OpKind.NEGATE, + num_items=h_in.size, + stream=cuda_stream, + ) + cc.histogram_even( + d_samples=d_in, + d_histogram=d_hist, + num_output_levels=5, + lower_level=np.int32(worker_id * 10), + upper_level=np.int32(worker_id * 10 + 16), + num_samples=h_in.size, + stream=cuda_stream, + ) + keys = cc.DoubleBuffer(d_keys_in, d_keys_tmp) + cc.radix_sort( + d_in_keys=keys, + d_out_keys=None, + d_in_values=None, + d_out_values=None, + num_items=d_keys_in.size, + order=cc.SortOrder.ASCENDING, + stream=cuda_stream, + ) + + stream.synchronize() + assert int(d_reduce_out.get()[0]) == int(h_in.sum() + worker_id) + expected_scan = np.empty_like(h_in) + expected_scan[0] = worker_id + expected_scan[1:] = worker_id + np.cumsum(h_in[:-1]) + np.testing.assert_array_equal(d_scan_out.get(), expected_scan) + np.testing.assert_array_equal(d_transform_out.get(), -h_in) + assert int(d_hist.sum().get()) == h_in.size + np.testing.assert_array_equal(keys.current().get(), np.sort(h_keys)) + + return thread + + for _ in range(STRESS_ITERATIONS): + _run_threaded([make_thread(worker_id) for worker_id in range(STRESS_THREADS)]) + + +def test_cache_clear_while_active_operations_is_not_a_supported_contract(): + pytest.skip( + "clear_all_caches() while cached operations are active is an unsupported " + "contract decision; see ST-19 in stress_tests.md." + ) diff --git a/python/cuda_cccl/tests/compute/test_no_numba.py b/python/cuda_cccl/tests/compute/test_no_numba.py index 8f1d271e1ed..6fb1ef0e811 100644 --- a/python/cuda_cccl/tests/compute/test_no_numba.py +++ b/python/cuda_cccl/tests/compute/test_no_numba.py @@ -1,16 +1,108 @@ +# Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + import cupy as cp import numpy as np import pytest import cuda.compute -from cuda.compute import OpKind +from cuda.compute import ( + CacheModifiedInputIterator, + ConstantIterator, + CountingIterator, + DiscardIterator, + OpKind, + PermutationIterator, + ReverseIterator, + ShuffleIterator, + SortOrder, + TransformIterator, + TransformOutputIterator, + ZipIterator, +) +from cuda.compute._cpp_compile import compile_cpp_op_code +from cuda.compute.op import RawOp +from cuda.compute.types import int16 as cccl_int16 +from cuda.compute.types import int32 as cccl_int32 -# Mainly, these tests check that we can use algorithms with OpKind -# operators while not requiring numba to be installed. +# These tests define the minimal-extra integration contract. They intentionally +# use small fixed inputs and avoid the Python-callable operator path. pytestmark = pytest.mark.no_numba -@pytest.mark.no_numba +def _raw_op(source: str, name: str) -> RawOp: + return RawOp(ltoir=compile_cpp_op_code(source), name=name) + + +def _raw_even_i32_op() -> RawOp: + source = """ +extern "C" __device__ void no_numba_even_i32(void* x, void* result) { + int value = *static_cast(x); + *static_cast(result) = (value % 2) == 0; +} +""" + return _raw_op(source, "no_numba_even_i32") + + +def _raw_less_than_i32_op(name: str, threshold: int) -> RawOp: + source = f""" +extern "C" __device__ void {name}(void* x, void* result) {{ + int value = *static_cast(x); + *static_cast(result) = value < {threshold} ? 1 : 0; +}} +""" + return _raw_op(source, name) + + +def _raw_plus_i64_op() -> RawOp: + source = """ +extern "C" __device__ void no_numba_plus_i64( + void* lhs, + void* rhs, + void* result +) { + *static_cast(result) = + *static_cast(lhs) + *static_cast(rhs); +} +""" + return _raw_op(source, "no_numba_plus_i64") + + +def _raw_square_i32_op() -> RawOp: + source = """ +extern "C" __device__ void no_numba_square_i32(void* x, void* result) { + int value = *static_cast(x); + *static_cast(result) = value * value; +} +""" + return _raw_op(source, "no_numba_square_i32") + + +def _raw_zip_sum_i32_op() -> RawOp: + source = """ +struct Zip2I32 { + int field_0; + int field_1; +}; + +extern "C" __device__ void no_numba_zip_sum_i32(void* x, void* result) { + auto values = static_cast(x); + *static_cast(result) = values->field_0 + values->field_1; +} +""" + return _raw_op(source, "no_numba_zip_sum_i32") + + +def _raw_negate_i16_op() -> RawOp: + source = """ +extern "C" __device__ void no_numba_negate_i16(void* x, void* result) { + *static_cast(result) = -*static_cast(x); +} +""" + return _raw_op(source, "no_numba_negate_i16") + + def test_import_numba_raises(): with pytest.raises( ImportError, match="This test is marked 'no_numba' but attempted to import it" @@ -18,68 +110,459 @@ def test_import_numba_raises(): import numba.cuda # noqa: F401 -def test_reduce_op_kind(): - num_items = 100 - h_input = np.arange(num_items, dtype=np.int32) - d_input = cp.array(h_input) +def test_reduce_well_known_plus(): + h_input = np.arange(1, 14, dtype=np.int32) + d_input = cp.asarray(h_input) d_output = cp.empty(1, dtype=np.int32) + h_init = np.array([5], dtype=np.int32) - h_init = np.array(0, dtype=np.int32) cuda.compute.reduce_into( - d_in=d_input, d_out=d_output, num_items=num_items, op=OpKind.PLUS, h_init=h_init + d_in=d_input, + d_out=d_output, + num_items=d_input.size, + op=OpKind.PLUS, + h_init=h_init, + ) + + assert d_output.get()[0] == np.sum(h_input, initial=h_init[0]) + + +def test_exclusive_scan_well_known_plus(): + d_input = cp.asarray([2, 4, 6, 8, 10, 12], dtype=np.uint16) + d_output = cp.empty_like(d_input) + h_init = np.array([1], dtype=np.uint16) + + cuda.compute.exclusive_scan( + d_in=d_input, + d_out=d_output, + op=OpKind.PLUS, + init_value=h_init, + num_items=d_input.size, ) - result = d_output.get()[0] - expected = np.sum(h_input) - assert result == expected + expected = np.asarray([1, 3, 7, 13, 21, 31], dtype=np.uint16) + np.testing.assert_array_equal(d_output.get(), expected) -def test_binary_transform_op_kind(): - num_items = 100 - h_input1 = np.arange(num_items, dtype=np.int32) - h_input2 = np.arange(num_items, dtype=np.int32) * 2 - d_input1 = cp.array(h_input1) - d_input2 = cp.array(h_input2) - d_output = cp.empty(num_items, dtype=np.int32) +def test_binary_transform_well_known_plus(): + d_lhs = cp.asarray([1.5, 2.5, 3.5, 4.5], dtype=np.float32) + d_rhs = cp.asarray([10.0, 20.0, 30.0, 40.0], dtype=np.float32) + d_output = cp.empty_like(d_lhs) cuda.compute.binary_transform( - d_in1=d_input1, - d_in2=d_input2, + d_in1=d_lhs, + d_in2=d_rhs, d_out=d_output, op=OpKind.PLUS, - num_items=num_items, + num_items=d_lhs.size, ) - result = d_output.get() - expected = h_input1 + h_input2 - assert np.array_equal(result, expected) + np.testing.assert_allclose(d_output.get(), d_lhs.get() + d_rhs.get()) + + +def test_unary_transform_well_known_negate(): + d_input = cp.asarray([-4, -2, 0, 2, 4], dtype=np.int8) + d_output = cp.empty_like(d_input) + + cuda.compute.unary_transform( + d_in=d_input, + d_out=d_output, + op=OpKind.NEGATE, + num_items=d_input.size, + ) + + np.testing.assert_array_equal(d_output.get(), np.asarray([4, 2, 0, -2, -4])) + + +@pytest.mark.parametrize( + "search, side", + [ + (cuda.compute.lower_bound, "left"), + (cuda.compute.upper_bound, "right"), + ], +) +def test_binary_search_explicit_opkind_less(search, side): + h_data = np.asarray([1, 3, 3, 7, 9, 11], dtype=np.int64) + h_values = np.asarray([0, 3, 4, 10, 12], dtype=np.int64) + d_out = cp.empty(h_values.size, dtype=np.uintp) + + search( + d_data=cp.asarray(h_data), + num_items=h_data.size, + d_values=cp.asarray(h_values), + num_values=h_values.size, + d_out=d_out, + comp=OpKind.LESS, + ) + + expected = np.searchsorted(h_data, h_values, side=side).astype(np.uintp) + np.testing.assert_array_equal(d_out.get(), expected) + + +def test_segmented_reduce_well_known_plus(monkeypatch): + monkeypatch.setattr(cuda.compute._cccl_interop, "_check_sass", False) + + d_input = cp.asarray([1, 2, 3, 4, 5, 6, 7, 8], dtype=np.uint32) + d_starts = cp.asarray([0, 3, 5], dtype=np.int32) + d_ends = cp.asarray([3, 5, 8], dtype=np.int32) + d_output = cp.empty(3, dtype=np.uint32) + h_init = np.array([0], dtype=np.uint32) + + cuda.compute.segmented_reduce( + d_in=d_input, + d_out=d_output, + num_segments=3, + start_offsets_in=d_starts, + end_offsets_in=d_ends, + op=OpKind.PLUS, + h_init=h_init, + ) + + np.testing.assert_array_equal(d_output.get(), np.asarray([6, 9, 21])) + + +def test_merge_sort_well_known_less(): + d_input = cp.asarray([3.5, -1.0, 2.25, 2.0, 7.0], dtype=np.float64) + d_output = cp.empty_like(d_input) + + cuda.compute.merge_sort( + d_in_keys=d_input, + d_in_values=None, + d_out_keys=d_output, + d_out_values=None, + num_items=d_input.size, + op=OpKind.LESS, + ) + + np.testing.assert_array_equal(d_output.get(), np.sort(d_input.get())) + + +def test_radix_sort_key_value_pairs(): + h_keys = np.asarray([4, -2, 7, 1, -2, 0], dtype=np.int16) + h_values = np.asarray([40, 20, 70, 10, 21, 0], dtype=np.uint8) + d_out_keys = cp.empty_like(cp.asarray(h_keys)) + d_out_values = cp.empty_like(cp.asarray(h_values)) + cuda.compute.radix_sort( + d_in_keys=cp.asarray(h_keys), + d_out_keys=d_out_keys, + d_in_values=cp.asarray(h_values), + d_out_values=d_out_values, + num_items=h_keys.size, + order=SortOrder.ASCENDING, + ) -def test_segmented_sort_op_kind(): - # Create segments: [3, 1, 4] | [1, 5, 9, 2] | [6, 5] - num_items = 9 - h_keys = np.array([3, 1, 4, 1, 5, 9, 2, 6, 5], dtype=np.int32) - h_offsets = np.array([0, 3, 7, 9], dtype=np.int32) + order = np.argsort(h_keys, stable=True) + np.testing.assert_array_equal(d_out_keys.get(), h_keys[order]) + np.testing.assert_array_equal(d_out_values.get(), h_values[order]) - d_keys_in = cp.array(h_keys) - d_keys_out = cp.empty(num_items, dtype=np.int32) - d_offsets = cp.array(h_offsets) - num_segments = len(h_offsets) - 1 +def test_segmented_sort_keys(): + h_keys = np.asarray([3, 1, 2, 9, 7, 8, 6, 5], dtype=np.uint64) + h_offsets = np.asarray([0, 3, 6, 8], dtype=np.int64) + d_output = cp.empty_like(cp.asarray(h_keys)) cuda.compute.segmented_sort( - d_in_keys=d_keys_in, - d_out_keys=d_keys_out, + d_in_keys=cp.asarray(h_keys), + d_out_keys=d_output, d_in_values=None, d_out_values=None, + num_items=h_keys.size, + num_segments=h_offsets.size - 1, + start_offsets_in=cp.asarray(h_offsets[:-1]), + end_offsets_in=cp.asarray(h_offsets[1:]), + order=SortOrder.ASCENDING, + ) + + expected = np.asarray([1, 2, 3, 7, 8, 9, 5, 6], dtype=np.uint64) + np.testing.assert_array_equal(d_output.get(), expected) + + +def test_unique_by_key_well_known_equal_to(monkeypatch): + cc_major, _ = cuda.compute._cccl_interop.CudaDevice().compute_capability + if cc_major >= 9: + monkeypatch.setattr(cuda.compute._cccl_interop, "_check_sass", False) + + d_keys = cp.asarray([1, 1, 2, 2, 2, 3, 4, 4], dtype=np.int16) + d_values = cp.asarray([10, 11, 20, 21, 22, 30, 40, 41], dtype=np.int8) + d_out_keys = cp.empty_like(d_keys) + d_out_values = cp.empty_like(d_values) + d_num_selected = cp.empty(1, dtype=np.int64) + + cuda.compute.unique_by_key( + d_in_keys=d_keys, + d_in_items=d_values, + d_out_keys=d_out_keys, + d_out_items=d_out_values, + d_out_num_selected=d_num_selected, + op=OpKind.EQUAL_TO, + num_items=d_keys.size, + ) + + num_selected = int(d_num_selected.get()[0]) + np.testing.assert_array_equal(d_out_keys.get()[:num_selected], [1, 2, 3, 4]) + np.testing.assert_array_equal(d_out_values.get()[:num_selected], [10, 20, 30, 40]) + + +def test_histogram_even_small_range(): + h_samples = np.asarray([0.5, 1.5, 2.5, 2.75, 3.0, 3.5], dtype=np.float32) + d_histogram = cp.empty(4, dtype=np.int32) + + cuda.compute.histogram_even( + d_samples=cp.asarray(h_samples), + d_histogram=d_histogram, + num_output_levels=5, + lower_level=np.float32(0.0), + upper_level=np.float32(4.0), + num_samples=h_samples.size, + ) + + expected, _ = np.histogram(h_samples, bins=4, range=(0.0, 4.0)) + np.testing.assert_array_equal(d_histogram.get(), expected.astype(np.int32)) + + +def test_select_raw_op(): + h_input = np.arange(12, dtype=np.int32) + d_output = cp.empty_like(cp.asarray(h_input)) + d_num_selected = cp.empty(1, dtype=np.uint64) + + cuda.compute.select( + d_in=cp.asarray(h_input), + d_out=d_output, + d_num_selected_out=d_num_selected, + cond=_raw_even_i32_op(), + num_items=h_input.size, + ) + + num_selected = int(d_num_selected.get()[0]) + np.testing.assert_array_equal(d_output.get()[:num_selected], h_input[::2]) + + +def test_three_way_partition_raw_op(): + h_input = np.arange(12, dtype=np.int32) + d_first = cp.empty_like(cp.asarray(h_input)) + d_second = cp.empty_like(cp.asarray(h_input)) + d_unselected = cp.empty_like(cp.asarray(h_input)) + d_num_selected = cp.empty(2, dtype=np.uint64) + + cuda.compute.three_way_partition( + d_in=cp.asarray(h_input), + d_first_part_out=d_first, + d_second_part_out=d_second, + d_unselected_out=d_unselected, + d_num_selected_out=d_num_selected, + select_first_part_op=_raw_less_than_i32_op("no_numba_less_than_4_i32", 4), + select_second_part_op=_raw_less_than_i32_op("no_numba_less_than_8_i32", 8), + num_items=h_input.size, + ) + + selected = d_num_selected.get() + first_count = int(selected[0]) + second_count = int(selected[1]) + unselected_count = h_input.size - first_count - second_count + + np.testing.assert_array_equal(d_first.get()[:first_count], h_input[:4]) + np.testing.assert_array_equal(d_second.get()[:second_count], h_input[4:8]) + np.testing.assert_array_equal(d_unselected.get()[:unselected_count], h_input[8:]) + + +def test_raw_op_reduce(): + h_input = np.asarray([10, 20, 30, 40], dtype=np.int64) + d_output = cp.empty(1, dtype=np.int64) + + cuda.compute.reduce_into( + d_in=cp.asarray(h_input), + d_out=d_output, + num_items=h_input.size, + op=_raw_plus_i64_op(), + h_init=np.array([5], dtype=np.int64), + ) + + assert d_output.get()[0] == 105 + + +def test_stream_argument(cuda_stream): + d_lhs = cp.asarray([2, 4, 6, 8, 10], dtype=np.int32) + d_rhs = cp.asarray([1, 3, 5, 7, 9], dtype=np.int32) + d_output = cp.empty_like(d_lhs) + + cuda.compute.binary_transform( + d_in1=d_lhs, + d_in2=d_rhs, + d_out=d_output, + op=OpKind.PLUS, + num_items=d_lhs.size, + stream=cuda_stream, + ) + + cp.cuda.Device().synchronize() + np.testing.assert_array_equal(d_output.get(), np.asarray([3, 7, 11, 15, 19])) + + +def test_counting_iterator_reduce(): + d_output = cp.empty(1, dtype=np.int32) + + cuda.compute.reduce_into( + d_in=CountingIterator(np.int32(3)), + d_out=d_output, + num_items=8, + op=OpKind.PLUS, + h_init=np.array([0], dtype=np.int32), + ) + + assert d_output.get()[0] == 52 + + +def test_constant_iterator_reduce(): + d_output = cp.empty(1, dtype=np.float32) + + cuda.compute.reduce_into( + d_in=ConstantIterator(np.float32(1.5)), + d_out=d_output, + num_items=8, + op=OpKind.PLUS, + h_init=np.array([0], dtype=np.float32), + ) + + np.testing.assert_allclose(d_output.get()[0], np.float32(12.0)) + + +def test_cache_modified_input_iterator_reduce(): + d_input = cp.asarray([2, 4, 6, 8, 10], dtype=np.uint16) + d_output = cp.empty(1, dtype=np.uint16) + iterator = CacheModifiedInputIterator(d_input, modifier="stream") + + cuda.compute.reduce_into( + d_in=iterator, + d_out=d_output, + num_items=d_input.size, + op=OpKind.PLUS, + h_init=np.array([0], dtype=np.uint16), + ) + + assert d_output.get()[0] == 30 + + +def test_reverse_input_iterator_scan(): + d_input = cp.asarray([1, 2, 3, 4, 5], dtype=np.int32) + d_output = cp.empty_like(d_input) + + cuda.compute.inclusive_scan( + d_in=ReverseIterator(d_input), + d_out=d_output, + op=OpKind.PLUS, + init_value=np.array([0], dtype=np.int32), + num_items=d_input.size, + ) + + np.testing.assert_array_equal(d_output.get(), np.asarray([5, 9, 12, 14, 15])) + + +def test_reverse_output_iterator_scan(): + d_input = cp.asarray([1, 2, 3, 4, 5], dtype=np.int32) + d_output = cp.empty_like(d_input) + + cuda.compute.inclusive_scan( + d_in=d_input, + d_out=ReverseIterator(d_output), + op=OpKind.PLUS, + init_value=np.array([0], dtype=np.int32), + num_items=d_input.size, + ) + + np.testing.assert_array_equal(d_output.get(), np.asarray([15, 10, 6, 3, 1])) + + +def test_permutation_iterator_reduce(): + d_values = cp.asarray([10, 20, 30, 40, 50, 60], dtype=np.int64) + d_indices = cp.asarray([4, 2, 5, 1], dtype=np.int32) + d_output = cp.empty(1, dtype=np.int64) + + cuda.compute.reduce_into( + d_in=PermutationIterator(d_values, d_indices), + d_out=d_output, + num_items=d_indices.size, + op=OpKind.PLUS, + h_init=np.array([0], dtype=np.int64), + ) + + assert d_output.get()[0] == 160 + + +def test_transform_iterator_reduce(): + d_output = cp.empty(1, dtype=np.int32) + iterator = TransformIterator( + CountingIterator(np.int32(1)), _raw_square_i32_op(), value_type=cccl_int32 + ) + + cuda.compute.reduce_into( + d_in=iterator, + d_out=d_output, + num_items=6, + op=OpKind.PLUS, + h_init=np.array([0], dtype=np.int32), + ) + + assert d_output.get()[0] == 91 + + +def test_transform_output_iterator_reduce(): + d_input = cp.asarray([1, 2, 3, 4], dtype=np.int16) + d_output = cp.empty(1, dtype=np.int16) + output_iterator = TransformOutputIterator( + d_output, _raw_negate_i16_op(), output_value_type=cccl_int16 + ) + + cuda.compute.reduce_into( + d_in=d_input, + d_out=output_iterator, + num_items=d_input.size, + op=OpKind.PLUS, + h_init=np.array([0], dtype=np.int16), + ) + + assert d_output.get()[0] == -10 + + +def test_zip_iterator_transform(): + d_lhs = cp.asarray([1, 2, 3, 4, 5], dtype=np.int32) + d_rhs = cp.asarray([10, 20, 30, 40, 50], dtype=np.int32) + d_output = cp.empty_like(d_lhs) + + cuda.compute.unary_transform( + d_in=ZipIterator(d_lhs, d_rhs), + d_out=d_output, + op=_raw_zip_sum_i32_op(), + num_items=d_lhs.size, + ) + + np.testing.assert_array_equal(d_output.get(), d_lhs.get() + d_rhs.get()) + + +def test_shuffle_iterator_transform(): + num_items = 17 + d_output = cp.empty(num_items, dtype=np.int64) + + cuda.compute.unary_transform( + d_in=ShuffleIterator(num_items, seed=123), + d_out=d_output, + op=OpKind.IDENTITY, num_items=num_items, - num_segments=num_segments, - start_offsets_in=d_offsets[:-1], - end_offsets_in=d_offsets[1:], - order=cuda.compute.SortOrder.ASCENDING, ) - result = d_keys_out.get() - # Expected: [1, 3, 4] | [1, 2, 5, 9] | [5, 6] - expected = np.array([1, 3, 4, 1, 2, 5, 9, 5, 6], dtype=np.int32) - assert np.array_equal(result, expected) + result = d_output.get() + assert sorted(result.tolist()) == list(range(num_items)) + + +def test_discard_iterator_transform(): + d_input = cp.asarray([1, 2, 3, 4, 5], dtype=np.int32) + d_reference = cp.full_like(d_input, -1) + + cuda.compute.unary_transform( + d_in=d_input, + d_out=DiscardIterator(d_reference), + op=OpKind.IDENTITY, + num_items=d_input.size, + ) + + np.testing.assert_array_equal(d_reference.get(), np.full(5, -1, dtype=np.int32))