diff --git a/Include/internal/pycore_frame.h b/Include/internal/pycore_frame.h index 8c410e9e208340..50908f2cb7a1d2 100644 --- a/Include/internal/pycore_frame.h +++ b/Include/internal/pycore_frame.h @@ -44,15 +44,16 @@ extern PyFrameObject* _PyFrame_New_NoTrack(PyCodeObject *code); /* other API */ typedef enum _framestate { - FRAME_CREATED = -3, - FRAME_SUSPENDED = -2, - FRAME_SUSPENDED_YIELD_FROM = -1, + FRAME_CREATED = -4, + FRAME_SUSPENDED = -3, + FRAME_SUSPENDED_YIELD_FROM = -2, + FRAME_SUSPENDED_YIELD_FROM_LOCKED = -1, FRAME_EXECUTING = 0, FRAME_COMPLETED = 1, FRAME_CLEARED = 4 } PyFrameState; -#define FRAME_STATE_SUSPENDED(S) ((S) == FRAME_SUSPENDED || (S) == FRAME_SUSPENDED_YIELD_FROM) +#define FRAME_STATE_SUSPENDED(S) ((S) >= FRAME_SUSPENDED && (S) <= FRAME_SUSPENDED_YIELD_FROM_LOCKED) #define FRAME_STATE_FINISHED(S) ((S) >= FRAME_COMPLETED) #ifdef __cplusplus diff --git a/Include/internal/pycore_lock.h b/Include/internal/pycore_lock.h index c4e007e744ce0f..e31d8b4e5c68c9 100644 --- a/Include/internal/pycore_lock.h +++ b/Include/internal/pycore_lock.h @@ -70,6 +70,9 @@ PyMutex_LockFlags(PyMutex *m, _PyLockFlags flags) // error messages) otherwise returns 0. extern int _PyMutex_TryUnlock(PyMutex *m); +// Yield the processor to other threads (e.g., sched_yield). +extern void _Py_yield(void); + // PyEvent is a one-time event notification typedef struct { diff --git a/Lib/test/support/threading_helper.py b/Lib/test/support/threading_helper.py index 3e04c344a0d66f..cf87233f0e2e93 100644 --- a/Lib/test/support/threading_helper.py +++ b/Lib/test/support/threading_helper.py @@ -250,21 +250,32 @@ def requires_working_threading(*, module=False): return unittest.skipUnless(can_start_thread, msg) -def run_concurrently(worker_func, nthreads, args=(), kwargs={}): +def run_concurrently(worker_func, nthreads=None, args=(), kwargs={}): """ - Run the worker function concurrently in multiple threads. + Run the worker function(s) concurrently in multiple threads. + + If `worker_func` is a single callable, it is used for all threads. + If it is a list of callables, each callable is used for one thread. """ + from collections.abc import Iterable + + if nthreads is None: + nthreads = len(worker_func) + if not isinstance(worker_func, Iterable): + worker_func = [worker_func] * nthreads + assert len(worker_func) == nthreads + barrier = threading.Barrier(nthreads) - def wrapper_func(*args, **kwargs): + def wrapper_func(func, *args, **kwargs): # Wait for all threads to reach this point before proceeding. barrier.wait() - worker_func(*args, **kwargs) + func(*args, **kwargs) with catch_threading_exception() as cm: workers = [ - threading.Thread(target=wrapper_func, args=args, kwargs=kwargs) - for _ in range(nthreads) + threading.Thread(target=wrapper_func, args=(func, *args), kwargs=kwargs) + for func in worker_func ] with start_threads(workers): pass diff --git a/Lib/test/test_free_threading/test_generators.py b/Lib/test/test_free_threading/test_generators.py index 11f59301bcd51d..2b41e28896f5a8 100644 --- a/Lib/test/test_free_threading/test_generators.py +++ b/Lib/test/test_free_threading/test_generators.py @@ -1,4 +1,6 @@ import concurrent.futures +import itertools +import threading import unittest from threading import Barrier from unittest import TestCase @@ -120,3 +122,38 @@ def drive_generator(g): g = gen() threading_helper.run_concurrently(drive_generator, self.NUM_THREADS, args=(g,)) + + def test_concurrent_gi_yieldfrom(self): + def gen_yield_from(): + yield from itertools.count() + + g = gen_yield_from() + next(g) # Put in FRAME_SUSPENDED_YIELD_FROM state + + def read_yieldfrom(gen): + for _ in range(10000): + self.assertIsNotNone(gen.gi_yieldfrom) + + threading_helper.run_concurrently(read_yieldfrom, self.NUM_THREADS, args=(g,)) + + def test_gi_yieldfrom_close_race(self): + def gen_yield_from(): + yield from itertools.count() + + g = gen_yield_from() + next(g) + + done = threading.Event() + + def reader(): + while not done.is_set(): + g.gi_yieldfrom + + def closer(): + try: + g.close() + except ValueError: + pass + done.set() + + threading_helper.run_concurrently([reader, closer]) diff --git a/Misc/NEWS.d/next/Core_and_Builtins/2026-01-27-17-49-43.gh-issue-120321.Vo7c9T.rst b/Misc/NEWS.d/next/Core_and_Builtins/2026-01-27-17-49-43.gh-issue-120321.Vo7c9T.rst new file mode 100644 index 00000000000000..052ed07c123bcf --- /dev/null +++ b/Misc/NEWS.d/next/Core_and_Builtins/2026-01-27-17-49-43.gh-issue-120321.Vo7c9T.rst @@ -0,0 +1,2 @@ +Made ``gi_yieldfrom`` thread-safe in the free-threading build +by using a lightweight lock on the frame state. diff --git a/Objects/genobject.c b/Objects/genobject.c index fcdb9017a35f5b..cd23afca57e0fc 100644 --- a/Objects/genobject.c +++ b/Objects/genobject.c @@ -10,6 +10,7 @@ #include "pycore_gc.h" // _PyGC_CLEAR_FINALIZED() #include "pycore_genobject.h" // _PyGen_SetStopIterationValue() #include "pycore_interpframe.h" // _PyFrame_GetCode() +#include "pycore_lock.h" // _Py_yield() #include "pycore_modsupport.h" // _PyArg_CheckPositional() #include "pycore_object.h" // _PyObject_GC_UNTRACK() #include "pycore_opcode_utils.h" // RESUME_AFTER_YIELD_FROM @@ -44,6 +45,18 @@ static PyObject* async_gen_athrow_new(PyAsyncGenObject *, PyObject *); ((gen)->gi_frame_state = (state), true) #endif +// Wait for any in-progress gi_yieldfrom read to complete. +static inline void +gen_yield_from_lock_wait(PyGenObject *gen, int8_t *frame_state) +{ +#ifdef Py_GIL_DISABLED + while (*frame_state == FRAME_SUSPENDED_YIELD_FROM_LOCKED) { + _Py_yield(); + *frame_state = _Py_atomic_load_int8_relaxed(&gen->gi_frame_state); + } +#endif +} + static const char *NON_INIT_CORO_MSG = "can't send non-None value to a " "just-started coroutine"; @@ -318,6 +331,8 @@ gen_send_ex(PyGenObject *gen, PyObject *arg, PyObject **presult) *presult = NULL; int8_t frame_state = FT_ATOMIC_LOAD_INT8_RELAXED(gen->gi_frame_state); do { + gen_yield_from_lock_wait(gen, &frame_state); + if (frame_state == FRAME_CREATED && arg && arg != Py_None) { const char *msg = "can't send non-None value to a " "just-started generator"; @@ -452,6 +467,8 @@ gen_close(PyObject *self, PyObject *args) int8_t frame_state = FT_ATOMIC_LOAD_INT8_RELAXED(gen->gi_frame_state); do { + gen_yield_from_lock_wait(gen, &frame_state); + if (frame_state == FRAME_CREATED) { // && (1) to avoid -Wunreachable-code warning on Clang if (!_Py_GEN_TRY_SET_FRAME_STATE(gen, frame_state, FRAME_CLEARED) && (1)) { @@ -614,6 +631,8 @@ _gen_throw(PyGenObject *gen, int close_on_genexit, { int8_t frame_state = FT_ATOMIC_LOAD_INT8_RELAXED(gen->gi_frame_state); do { + gen_yield_from_lock_wait(gen, &frame_state); + if (frame_state == FRAME_EXECUTING) { gen_raise_already_executing_error(gen); return NULL; @@ -876,12 +895,25 @@ static PyObject * gen_getyieldfrom(PyObject *self, void *Py_UNUSED(ignored)) { PyGenObject *gen = _PyGen_CAST(self); - int8_t frame_state = FT_ATOMIC_LOAD_INT8_RELAXED(gen->gi_frame_state); +#ifdef Py_GIL_DISABLED + int8_t frame_state = _Py_atomic_load_int8_relaxed(&gen->gi_frame_state); + do { + gen_yield_from_lock_wait(gen, &frame_state); + if (frame_state != FRAME_SUSPENDED_YIELD_FROM) { + Py_RETURN_NONE; + } + } while (!_Py_GEN_TRY_SET_FRAME_STATE(gen, frame_state, FRAME_SUSPENDED_YIELD_FROM_LOCKED)); + + PyObject *result = PyStackRef_AsPyObjectNew(_PyFrame_StackPeek(&gen->gi_iframe)); + _Py_atomic_store_int8_release(&gen->gi_frame_state, FRAME_SUSPENDED_YIELD_FROM); + return result; +#else + int8_t frame_state = gen->gi_frame_state; if (frame_state != FRAME_SUSPENDED_YIELD_FROM) { Py_RETURN_NONE; } - // TODO: still not thread-safe with free threading return PyStackRef_AsPyObjectNew(_PyFrame_StackPeek(&gen->gi_iframe)); +#endif } diff --git a/Python/ceval.c b/Python/ceval.c index 04ae7b4d86f9d0..c59f20bbf1e803 100644 --- a/Python/ceval.c +++ b/Python/ceval.c @@ -3391,7 +3391,9 @@ _PyEval_GetAwaitable(PyObject *iterable, int oparg) else if (PyCoro_CheckExact(iter)) { PyCoroObject *coro = (PyCoroObject *)iter; int8_t frame_state = FT_ATOMIC_LOAD_INT8_RELAXED(coro->cr_frame_state); - if (frame_state == FRAME_SUSPENDED_YIELD_FROM) { + if (frame_state == FRAME_SUSPENDED_YIELD_FROM || + frame_state == FRAME_SUSPENDED_YIELD_FROM_LOCKED) + { /* `iter` is a coroutine object that is being awaited. */ Py_CLEAR(iter); _PyErr_SetString(PyThreadState_GET(), PyExc_RuntimeError, diff --git a/Python/ceval_macros.h b/Python/ceval_macros.h index 1cbeb18d02c082..b127812b4bf703 100644 --- a/Python/ceval_macros.h +++ b/Python/ceval_macros.h @@ -522,19 +522,22 @@ gen_try_set_executing(PyGenObject *gen) #ifdef Py_GIL_DISABLED if (!_PyObject_IsUniquelyReferenced((PyObject *)gen)) { int8_t frame_state = _Py_atomic_load_int8_relaxed(&gen->gi_frame_state); - while (frame_state < FRAME_EXECUTING) { + while (frame_state < FRAME_SUSPENDED_YIELD_FROM_LOCKED) { if (_Py_atomic_compare_exchange_int8(&gen->gi_frame_state, &frame_state, FRAME_EXECUTING)) { return true; } } + // NB: We return false for FRAME_SUSPENDED_YIELD_FROM_LOCKED as well. + // That case is rare enough that we can just handle it in the deopt. return false; } #endif // Use faster non-atomic modifications in the GIL-enabled build and when // the object is uniquely referenced in the free-threaded build. if (gen->gi_frame_state < FRAME_EXECUTING) { + assert(gen->gi_frame_state != FRAME_SUSPENDED_YIELD_FROM_LOCKED); gen->gi_frame_state = FRAME_EXECUTING; return true; } diff --git a/Python/lock.c b/Python/lock.c index 12b5ebc89aeec7..ad97bfd93c8495 100644 --- a/Python/lock.c +++ b/Python/lock.c @@ -40,7 +40,7 @@ struct mutex_entry { int handed_off; }; -static void +void _Py_yield(void) { #ifdef MS_WINDOWS