diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0fe7c99..95781e0 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -16,25 +16,19 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.8", "3.9", "3.10", "3.11"] + python-version: ["3.10", "3.11", "3.12", "3.13"] steps: - - uses: actions/checkout@v2 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + - uses: actions/checkout@v5 + - name: Install uv and set the Python version + uses: astral-sh/setup-uv@v6 with: python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: | - python -m pip install --upgrade pip - python -m pip install flake8 - python -m pip install .[test] - - name: Lint with flake8 + + - name: Lint with ruff run: | - # stop the build if there are Python syntax errors or undefined names - flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics - # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide - #flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - - name: Test with pytest + uvx ruff check + + - name: Run tests run: | - pytest -v + uv run -v pytest -v diff --git a/.gitignore b/.gitignore index 737d578..c1471df 100644 --- a/.gitignore +++ b/.gitignore @@ -128,3 +128,5 @@ dmypy.json # Pyre type checker .pyre/ + +uv.lock diff --git a/README.md b/README.md index 2e3c3d7..26dfe01 100644 --- a/README.md +++ b/README.md @@ -1,19 +1,19 @@ # parallel-numpy-rng [![tests](https://github.com/lgarrison/parallel-numpy-rng/actions/workflows/test.yml/badge.svg)](https://github.com/lgarrison/parallel-numpy-rng/actions/workflows/test.yml) [![PyPI](https://img.shields.io/pypi/v/parallel-numpy-rng)](https://pypi.org/project/parallel-numpy-rng/) -A multi-threaded random number generator backed by Numpy RNG, with parallelism provided by Numba. +A multi-threaded random number generator backed by NumPy RNG, with parallelism provided by Numba. ## Overview -Uses the "fast-forward" capability of the [PCG family of RNG](https://www.pcg-random.org), -as exposed by the [new-style Numpy RNG API](https://numpy.org/doc/stable/reference/random/index.html), -to generate random numbers in a multi-threaded manner. The key -is that you get the same random numbers regardless of how many threads were used. +This package uses the "fast-forward" capability of the [PCG family of RNG](https://www.pcg-random.org), +as exposed by the [new-style NumPy RNG API](https://numpy.org/doc/stable/reference/random/index.html), +to generate arrays of random numbers in a multi-threaded manner. The result depends only on the random +number seed, not the number of threads. Only a two types of random numbers are supported right now: uniform and normal. More could be added if there is demand, although some kinds, like bounded random ints, are hard to parallelize in the approach used here. -The uniform randoms are the same as Numpy produces for a given seed, although the +The uniform randoms are the same as NumPy produces for a given seed, although the random normals are not. ## Example + Performance @@ -34,7 +34,7 @@ numpy_rng = np.random.default_rng(seed) %timeit parallel_rng.standard_normal(size=10**8, dtype=np.float32, nthread=128) # 43.5 ms ``` -Note that the `parallel_rng` is slower than Numpy when using a single thread, because the parallel implementation requires a slower algorithm in some cases (this is especially noticeable for float64 and normals) +Note that the `parallel_rng` is slower than NumPy when using a single thread, because the parallel implementation requires a slower algorithm in some cases (this is especially noticeable for float64 and normals) ## Installation The code works and is [reasonably well tested](./test_parallel_numpy_rng.py), so it's probably ready for use. It can be installed from PyPI: @@ -56,8 +56,8 @@ to the RNG, thus advancing its internal state that many times. Therefore, we wou the second thread's RNG in a state that is already advanced *N/2* times, but without actually making *N/2* calls to get there. -This is known as fast-forwarding, or jump-ahead. [Numpy's new RNG API](https://numpy.org/doc/stable/reference/random/index.html) -(as of Numpy 1.17) uses the PCG RNG that supports this feature, and Numpy exposes an [`advance()` +This is known as fast-forwarding, or jump-ahead. [NumPy's new RNG API](https://numpy.org/doc/stable/reference/random/index.html) +(as of NumPy 1.17) uses the PCG RNG that supports this feature, and NumPy exposes an [`advance()` method](https://numpy.org/doc/stable/reference/random/bit_generators/generated/numpy.random.PCG64.advance.html#numpy.random.PCG64.advance) which implements it. The new API also exposes CFFI bindings to get PCG random values, so we can implement the core loop, including parallelism, in a Numba-compiled function @@ -67,5 +67,5 @@ An interesting consequence of using fast-forwarding is that each random value mu with a known number of calls to the underlying RNG, so that we know how many steps to advance the RNG state by. This means we can't use rejection sampling, which makes a variable number of calls. Fortunately, inverse-transform sampling can usually substitute, or more specific methods -like Box-Muller. These can be slower than rejection sampling (or whatever Numpy uses) with one +like Box-Muller. These can be slower than rejection sampling (or whatever NumPy uses) with one thread, but even just two threads more than makes up for the difference. diff --git a/pyproject.toml b/pyproject.toml index 9b54271..a82832f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,12 +1,47 @@ [build-system] requires = [ "setuptools>=42", - "wheel", "setuptools_scm>=6.2", ] build-backend = "setuptools.build_meta" +[project] +name = "parallel-numpy-rng" +authors = [ + {name = "Lehman Garrison"}, +] +description = "Parallel random number generation that produces the same result, regardless of the number of threads" +readme = "README.md" +requires-python = ">=3.10" +classifiers = [ + "Programming Language :: Python :: 3", +] +dependencies = [ + "numpy>=2", + "numba", +] +dynamic = ["version"] +license = "Apache-2.0" +license-files = ["LICENSE"] + +[project.optional-dependencies] +test = ["pytest"] + +[dependency-groups] +dev = ["parallel-numpy-rng[test]"] + +[project.urls] +Homepage = "https://github.com/lgarrison/parallel-numpy-rng/" + +[tool.setuptools] +package-dir = {"" = "src"} + +[tool.setuptools.packages.find] +where = ["src"] + [tool.setuptools_scm] write_to = "src/parallel_numpy_rng/version.py" -write_to_template = '__version__ = "{version}"' + +[tool.ruff.format] +quote-style = "single" diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 0a0e3e0..0000000 --- a/setup.cfg +++ /dev/null @@ -1,24 +0,0 @@ -[metadata] -name = parallel-numpy-rng -author = Lehman Garrison -description = Parallel random number generation that produces the same result, regardless of the number of threads -long_description = file: README.md -long_description_content_type = text/markdown -url = https://github.com/lgarrison/parallel-numpy-rng/ -classifiers = - Programming Language :: Python :: 3 - -[options] -python_requires = >=3.7 -package_dir = - = src -packages = find: -install_requires = - numpy >= 1.17 - numba >= 0.54.1 - -[options.packages.find] -where = src - -[options.extras_require] -test = pytest diff --git a/src/parallel_numpy_rng/__init__.py b/src/parallel_numpy_rng/__init__.py index 3a369b8..ea46588 100644 --- a/src/parallel_numpy_rng/__init__.py +++ b/src/parallel_numpy_rng/__init__.py @@ -1,2 +1,2 @@ -from .version import __version__ -from .parallel_numpy_rng import * +from .version import __version__ as __version__ +from .parallel_numpy_rng import * # noqa: F403 diff --git a/src/parallel_numpy_rng/parallel_numpy_rng.py b/src/parallel_numpy_rng/parallel_numpy_rng.py index cc98ded..44e9586 100644 --- a/src/parallel_numpy_rng/parallel_numpy_rng.py +++ b/src/parallel_numpy_rng/parallel_numpy_rng.py @@ -1,9 +1,9 @@ -''' +""" File: parallel_numpy_rng.py Author: Lehman Garrison (https://github.com/lgarrison) Website: https://github.com/lgarrison/parallel-numpy-rng License: Apache-2.0 -''' +""" import os @@ -13,20 +13,21 @@ from . import utils -__all__ = ['default_rng', 'MTGenerator'] +__all__ = ["default_rng", "MTGenerator"] + def default_rng(seed): - '''A convenience function to create a ``MTGenerator`` with the default - Numpy bit generator''' + """A convenience function to create a ``MTGenerator`` with the default + Numpy bit generator""" return MTGenerator(np.random.PCG64(seed)) class MTGenerator: - ''' + """ A Multi-Threaded random number generator. Generates the same random number stream independent of the number of threads. Implements a subset of the ``numpy.random.Generator`` API. - + Usage ----- ``` @@ -37,7 +38,7 @@ class MTGenerator: r3 = mtg.standard_normal(size=(16,3), nthread=2, dtype=np.float32) r4 = mtg.standard_normal(size=(16,11,12), nthread=2, dtype=np.float32) ``` - + Details ------- This generator lets multiple threads sample from the same logical RNG stream by @@ -46,32 +47,37 @@ class MTGenerator: times---not true for the default Numpy algorithms, which use rejection sampling! Therefore, the algorithms here may be slower than their Numpy counterparts, maybe by a lot. But with enough threads, MTGenerator might be faster. - + Not all numpy.random.Generator methods are implemented. Some kinds of random values, like bounded random ints, are hard to generate without rejection sampling. - + Even though we are relying on implementation details of the underlying RNG (specifically, how many calls to the RNG were made to generate each output value), a key aspect of this is that we don't have to guess if we were right: we can query the state of the RNG after generating values to see if it matched our guess. - ''' + """ + def __init__(self, bit_generator, nthread=-1): self.bitgen = bit_generator if nthread <= 0: nthread = len(os.sched_getaffinity(0)) self.nthread = nthread - + # each is a dict, keyed on dtype - self._next_float = _next_float[bit_generator.state['bit_generator']]['zero'] - self._next_float_nonzero = _next_float[bit_generator.state['bit_generator']]['nonzero'] - + self._next_float = _next_float[bit_generator.state["bit_generator"]]["zero"] + self._next_float_nonzero = _next_float[bit_generator.state["bit_generator"]][ + "nonzero" + ] + self._cached_normal = None - - def random(self, size=None, nthread=None, out=None, verify_rng=True, dtype=np.float64): - if size == None: + + def random( + self, size=None, nthread=None, out=None, verify_rng=True, dtype=np.float64 + ): + if size is None: size = 1 - if nthread == None: + if nthread is None: nthread = self.nthread if nthread < 1: raise ValueError("nthread must be >= 1") @@ -79,8 +85,8 @@ def random(self, size=None, nthread=None, out=None, verify_rng=True, dtype=np.fl size = np.prod(size, dtype=np.intp) if nthread > size: nthread = size - - starts = np.linspace(0, size, num=nthread+1, endpoint=True, dtype=np.int64) + + starts = np.linspace(0, size, num=nthread + 1, endpoint=True, dtype=np.int64) vals_per_call = 2 if dtype == np.float32 else 1 bitgens = [] states = np.empty(nthread, dtype=int) @@ -88,85 +94,88 @@ def random(self, size=None, nthread=None, out=None, verify_rng=True, dtype=np.fl bitgens += [self._copy_bitgen()] self._advance_bitgen(bitgens[-1], starts[t], vals_per_call) states[t] = bitgens[-1].ctypes.state_address - + if out is None: out = np.empty(size, dtype=dtype) next_float = self._next_float[dtype] _random(states, starts, out, next_float) - + if verify_rng: # Did we advance each RNG by the right amount? for t in range(nthread): _b = self._copy_bitgen() - self._advance_bitgen(_b, starts[t+1], vals_per_call) - assert bitgens[t].state['state'] == _b.state['state'] - + self._advance_bitgen(_b, starts[t + 1], vals_per_call) + assert bitgens[t].state["state"] == _b.state["state"] + # finally, advance the base RNG self._advance_bitgen(self.bitgen, size, vals_per_call) - + return out.reshape(_size) - + @staticmethod def _advance_bitgen(bitgen, vals, vals_per_call): - '''Advance the underlying generator, possibly "fractionally" + """Advance the underlying generator, possibly "fractionally" if the generator produces, e.g., two 32-bit values per 64-bit call - ''' - if bitgen.state['has_uint32']: + """ + if bitgen.state["has_uint32"]: assert vals_per_call == 2 # only supposed to happen with float32 bitgen.ctypes.next_uint32(bitgen.ctypes.state) vals -= 1 - bitgen.advance(vals//vals_per_call) + bitgen.advance(int(vals // vals_per_call)) for _ in range(vals % vals_per_call): bitgen.ctypes.next_uint32(bitgen.ctypes.state) - - - def standard_normal(self, size=None, nthread=None, out=None, verify_rng=True, dtype=np.float64): - ''' - + + def standard_normal( + self, size=None, nthread=None, out=None, verify_rng=True, dtype=np.float64 + ): + """ + Parameters ---------- verify_rng: bool Check the correctness; specifically, that each thread made the number of RNG calls expected. Disabling this may improve performance for latency-bound cases. Default: True. - ''' - if size == None: + """ + if size is None: size = 1 _size = size size = np.prod(size, dtype=np.intp) - if nthread == None: + if nthread is None: nthread = self.nthread - if nthread > max(size//2,1): - nthread = max(size//2,1) + if nthread > max(size // 2, 1): + nthread = max(size // 2, 1) if nthread < 1: raise ValueError("nthread must be >= 1") if out is None: out = np.empty(size, dtype=dtype) if size == 0: return out - + first = 0 - if self._cached_normal != None: + if self._cached_normal is not None: # the base RNG will already be advanced just past the cached value out[0] = self._cached_normal self._cached_normal = None first = 1 # where to start writing non-cached values - + # amount to fast-forward each thread's state # force even registration per thread # the last thread will spill into a cache if it overflows the array - ff = 2*np.linspace(0, (size+1-first)//2, num=nthread+1, endpoint=True, dtype=np.int64) + ff = 2 * np.linspace( + 0, (size + 1 - first) // 2, num=nthread + 1, endpoint=True, dtype=np.int64 + ) # note that ff[-1] will be out of bounds because here we're just tracking how much each # rng gets advanced, not where it's writing - + vals_per_call = 2 if dtype == np.float32 else 1 bitgens = [] states = np.empty(nthread, dtype=int) for t in range(nthread): bitgens += [self._copy_bitgen()] - bitgens[-1].advance(ff[t]//vals_per_call) + bitgens[-1].advance(int(ff[t] // vals_per_call)) states[t] = bitgens[-1].ctypes.state_address - + # now offset the out array _out = out[first:] next_float_nonzero = self._next_float_nonzero[dtype] @@ -174,77 +183,76 @@ def standard_normal(self, size=None, nthread=None, out=None, verify_rng=True, dt if not np.isnan(_cached_normal): self._cached_normal = _cached_normal del _out - + # Did we advance each RNG by the right amount? if verify_rng: for t in range(nthread): _b = self._copy_bitgen() - _b.advance(ff[t+1]//vals_per_call) - assert bitgens[t].state['state'] == _b.state['state'], t - + _b.advance(int(ff[t + 1] // vals_per_call)) + assert bitgens[t].state["state"] == _b.state["state"], t + # finally, advance the base RNG - self.bitgen.advance(ff[-1]//vals_per_call) - - return out.reshape(_size) + self.bitgen.advance(int(ff[-1] // vals_per_call)) + return out.reshape(_size) @staticmethod def _advance_bitgen_boxmuller(bitgen, vals, vals_per_call): - '''Advance the bitgen for Box-Muller normals - ''' - bitgen.advance((vals//vals_per_call//2)*2) - return (vals//vals_per_call) % 2 - - + """Advance the bitgen for Box-Muller normals""" + bitgen.advance(int((vals // vals_per_call // 2) * 2)) + return (vals // vals_per_call) % 2 + def _copy_bitgen(self): - '''Return a copy of the base bitgen in its current state''' + """Return a copy of the base bitgen in its current state""" new = self.bitgen.__class__() new.state = self.bitgen.state # this is a deep copy return new - + + @njit(fastmath=True, parallel=True) def _random(states, starts, out, next_double): nthread = len(states) - numba.set_num_threads(max(1,nthread)) + numba.set_num_threads(max(1, nthread)) for t in numba.prange(nthread): a = starts[t] - b = starts[t+1] + b = starts[t + 1] s = states[t] - for i in range(a,b): + for i in range(a, b): out[i] = next_double(s) - -@njit(fastmath=True,parallel=True) + +@njit(fastmath=True, parallel=True) def _boxmuller(states, starts, out, next_double): nthread = len(states) - numba.set_num_threads(max(1,nthread)) + numba.set_num_threads(max(1, nthread)) dtype = out.dtype.type cache = np.full(1, np.nan, dtype=dtype) for t in numba.prange(nthread): a = starts[t] - b = min(starts[t+1],len(out)) + b = min(starts[t + 1], len(out)) s = states[t] - for i in range(a,b,2): + for i in range(a, b, 2): u1 = next_double(s) u2 = next_double(s) - amp = np.sqrt(dtype(-2)*np.log(u1)) - ang = dtype(2*np.pi)*u2 - z0 = amp*np.cos(ang) - z1 = amp*np.sin(ang) + amp = np.sqrt(dtype(-2) * np.log(u1)) + ang = dtype(2 * np.pi) * u2 + z0 = amp * np.cos(ang) + z1 = amp * np.sin(ang) out[i] = z0 - if i+1 < b: - out[i+1] = z1 - elif t == nthread-1: + if i + 1 < b: + out[i + 1] = z1 + elif t == nthread - 1: cache[0] = z1 return cache[0] + _next_float = {} -_next_float['PCG64'] = utils.generate_int_to_float(np.random.PCG64) +_next_float["PCG64"] = utils.generate_int_to_float(np.random.PCG64) -if hasattr(np.random, 'PCG64DXSM'): +if hasattr(np.random, "PCG64DXSM"): # Numpy >= 1.21 - _next_float['PCG64XDSM'] = utils.generate_int_to_float(np.random.PCG64DXSM) + _next_float["PCG64XDSM"] = utils.generate_int_to_float(np.random.PCG64DXSM) diff --git a/src/parallel_numpy_rng/utils.py b/src/parallel_numpy_rng/utils.py index 9e7e838..01b7094 100644 --- a/src/parallel_numpy_rng/utils.py +++ b/src/parallel_numpy_rng/utils.py @@ -1,40 +1,57 @@ -''' +""" Utilities to help generate random values from bits -''' +""" import numpy as np from numba import njit # TODO: for very low latency cases, there may be benefit to inlining these + def generate_int_to_float(bitgen): - '''Return a nested dict of numba functions, keyed by 'zero'/'nonzero' then dtype''' - + """Return a nested dict of numba functions, keyed by 'zero'/'nonzero' then dtype""" + # initialize the numba functions to convert ints to floats _next_uint32_pcg64 = bitgen().ctypes.next_uint32 _next_uint64_pcg64 = bitgen().ctypes.next_uint64 @njit(fastmath=True) def _next_float_pcg64(state): - '''Random float in the semi-open interval [0,1)''' - return np.float32(np.int32(_next_uint32_pcg64(state) >> np.uint32(8)) * (np.float32(1.) / np.float32(16777216.))) + """Random float in the semi-open interval [0,1)""" + return np.float32( + np.int32(_next_uint32_pcg64(state) >> np.uint32(8)) + * (np.float32(1.0) / np.float32(16777216.0)) + ) @njit(fastmath=True) def _next_float_pcg64_nonzero(state): - '''Random float in the semi-open interval (0,1]''' - return np.float32((np.int32(_next_uint32_pcg64(state) >> np.uint32(8)) + np.int32(1)) * (np.float32(1.) / np.float32(16777216.))) + """Random float in the semi-open interval (0,1]""" + return np.float32( + (np.int32(_next_uint32_pcg64(state) >> np.uint32(8)) + np.int32(1)) + * (np.float32(1.0) / np.float32(16777216.0)) + ) @njit(fastmath=True) def _next_double_pcg64(state): - '''Random double in the semi-open interval [0,1)''' - return np.float64(np.int64(_next_uint64_pcg64(state) >> np.uint64(11)) * (np.float64(1.) / np.float64(9007199254740992.))) + """Random double in the semi-open interval [0,1)""" + return np.float64( + np.int64(_next_uint64_pcg64(state) >> np.uint64(11)) + * (np.float64(1.0) / np.float64(9007199254740992.0)) + ) @njit(fastmath=True) def _next_double_pcg64_nonzero(state): - '''Random double in the semi-open interval (0,1]''' - return np.float64((np.int64(_next_uint64_pcg64(state) >> np.uint64(11)) + np.int64(1)) * (np.float64(1.) / np.float64(9007199254740992.))) - - _next_float = {'zero': {np.float32:_next_float_pcg64, np.float64:_next_double_pcg64}, - 'nonzero': {np.float32:_next_float_pcg64_nonzero, np.float64:_next_double_pcg64_nonzero}, - } + """Random double in the semi-open interval (0,1]""" + return np.float64( + (np.int64(_next_uint64_pcg64(state) >> np.uint64(11)) + np.int64(1)) + * (np.float64(1.0) / np.float64(9007199254740992.0)) + ) + + _next_float = { + "zero": {np.float32: _next_float_pcg64, np.float64: _next_double_pcg64}, + "nonzero": { + np.float32: _next_float_pcg64_nonzero, + np.float64: _next_double_pcg64_nonzero, + }, + } return _next_float diff --git a/test_parallel_numpy_rng.py b/test_parallel_numpy_rng.py index 1d14227..3081d74 100644 --- a/test_parallel_numpy_rng.py +++ b/test_parallel_numpy_rng.py @@ -162,7 +162,6 @@ def test_mixing_threads(someN, seed, nthread, dtype): funcname = 'standard_normal' from parallel_numpy_rng import MTGenerator - rng = np.random.default_rng(seed) maxthreads = nthread del nthread