Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Include/internal/pycore_frame.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,16 @@ extern PyFrameObject* _PyFrame_New_NoTrack(PyCodeObject *code);
/* other API */

typedef enum _framestate {
FRAME_CREATED = -3,
FRAME_SUSPENDED = -2,
FRAME_SUSPENDED_YIELD_FROM = -1,
FRAME_CREATED = -4,
FRAME_SUSPENDED = -3,
FRAME_SUSPENDED_YIELD_FROM = -2,
FRAME_SUSPENDED_YIELD_FROM_LOCKED = -1,
FRAME_EXECUTING = 0,
FRAME_COMPLETED = 1,
FRAME_CLEARED = 4
} PyFrameState;

#define FRAME_STATE_SUSPENDED(S) ((S) == FRAME_SUSPENDED || (S) == FRAME_SUSPENDED_YIELD_FROM)
#define FRAME_STATE_SUSPENDED(S) ((S) >= FRAME_SUSPENDED && (S) <= FRAME_SUSPENDED_YIELD_FROM_LOCKED)
#define FRAME_STATE_FINISHED(S) ((S) >= FRAME_COMPLETED)

#ifdef __cplusplus
Expand Down
3 changes: 3 additions & 0 deletions Include/internal/pycore_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ PyMutex_LockFlags(PyMutex *m, _PyLockFlags flags)
// error messages) otherwise returns 0.
extern int _PyMutex_TryUnlock(PyMutex *m);

// Yield the processor to other threads (e.g., sched_yield).
extern void _Py_yield(void);


// PyEvent is a one-time event notification
typedef struct {
Expand Down
23 changes: 17 additions & 6 deletions Lib/test/support/threading_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,21 +250,32 @@ def requires_working_threading(*, module=False):
return unittest.skipUnless(can_start_thread, msg)


def run_concurrently(worker_func, nthreads, args=(), kwargs={}):
def run_concurrently(worker_func, nthreads=None, args=(), kwargs={}):
"""
Run the worker function concurrently in multiple threads.
Run the worker function(s) concurrently in multiple threads.

If `worker_func` is a single callable, it is used for all threads.
If it is a list of callables, each callable is used for one thread.
"""
from collections.abc import Iterable

if nthreads is None:
nthreads = len(worker_func)
if not isinstance(worker_func, Iterable):
worker_func = [worker_func] * nthreads
assert len(worker_func) == nthreads

barrier = threading.Barrier(nthreads)

def wrapper_func(*args, **kwargs):
def wrapper_func(func, *args, **kwargs):
# Wait for all threads to reach this point before proceeding.
barrier.wait()
worker_func(*args, **kwargs)
func(*args, **kwargs)

with catch_threading_exception() as cm:
workers = [
threading.Thread(target=wrapper_func, args=args, kwargs=kwargs)
for _ in range(nthreads)
threading.Thread(target=wrapper_func, args=(func, *args), kwargs=kwargs)
for func in worker_func
]
with start_threads(workers):
pass
Expand Down
37 changes: 37 additions & 0 deletions Lib/test/test_free_threading/test_generators.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import concurrent.futures
import itertools
import threading
import unittest
from threading import Barrier
from unittest import TestCase
Expand Down Expand Up @@ -120,3 +122,38 @@ def drive_generator(g):

g = gen()
threading_helper.run_concurrently(drive_generator, self.NUM_THREADS, args=(g,))

def test_concurrent_gi_yieldfrom(self):
def gen_yield_from():
yield from itertools.count()

g = gen_yield_from()
next(g) # Put in FRAME_SUSPENDED_YIELD_FROM state

def read_yieldfrom(gen):
for _ in range(10000):
self.assertIsNotNone(gen.gi_yieldfrom)

threading_helper.run_concurrently(read_yieldfrom, self.NUM_THREADS, args=(g,))

def test_gi_yieldfrom_close_race(self):
def gen_yield_from():
yield from itertools.count()

g = gen_yield_from()
next(g)

done = threading.Event()

def reader():
while not done.is_set():
g.gi_yieldfrom

def closer():
try:
g.close()
except ValueError:
pass
done.set()

threading_helper.run_concurrently([reader, closer])
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Made ``gi_yieldfrom`` thread-safe in the free-threading build
by using a lightweight lock on the frame state.
36 changes: 34 additions & 2 deletions Objects/genobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "pycore_gc.h" // _PyGC_CLEAR_FINALIZED()
#include "pycore_genobject.h" // _PyGen_SetStopIterationValue()
#include "pycore_interpframe.h" // _PyFrame_GetCode()
#include "pycore_lock.h" // _Py_yield()
#include "pycore_modsupport.h" // _PyArg_CheckPositional()
#include "pycore_object.h" // _PyObject_GC_UNTRACK()
#include "pycore_opcode_utils.h" // RESUME_AFTER_YIELD_FROM
Expand Down Expand Up @@ -44,6 +45,18 @@ static PyObject* async_gen_athrow_new(PyAsyncGenObject *, PyObject *);
((gen)->gi_frame_state = (state), true)
#endif

// Wait for any in-progress gi_yieldfrom read to complete.
static inline void
gen_yield_from_lock_wait(PyGenObject *gen, int8_t *frame_state)
{
#ifdef Py_GIL_DISABLED
while (*frame_state == FRAME_SUSPENDED_YIELD_FROM_LOCKED) {
_Py_yield();
*frame_state = _Py_atomic_load_int8_relaxed(&gen->gi_frame_state);
}
#endif
}


static const char *NON_INIT_CORO_MSG = "can't send non-None value to a "
"just-started coroutine";
Expand Down Expand Up @@ -318,6 +331,8 @@ gen_send_ex(PyGenObject *gen, PyObject *arg, PyObject **presult)
*presult = NULL;
int8_t frame_state = FT_ATOMIC_LOAD_INT8_RELAXED(gen->gi_frame_state);
do {
gen_yield_from_lock_wait(gen, &frame_state);

if (frame_state == FRAME_CREATED && arg && arg != Py_None) {
const char *msg = "can't send non-None value to a "
"just-started generator";
Expand Down Expand Up @@ -452,6 +467,8 @@ gen_close(PyObject *self, PyObject *args)

int8_t frame_state = FT_ATOMIC_LOAD_INT8_RELAXED(gen->gi_frame_state);
do {
gen_yield_from_lock_wait(gen, &frame_state);

if (frame_state == FRAME_CREATED) {
// && (1) to avoid -Wunreachable-code warning on Clang
if (!_Py_GEN_TRY_SET_FRAME_STATE(gen, frame_state, FRAME_CLEARED) && (1)) {
Expand Down Expand Up @@ -614,6 +631,8 @@ _gen_throw(PyGenObject *gen, int close_on_genexit,
{
int8_t frame_state = FT_ATOMIC_LOAD_INT8_RELAXED(gen->gi_frame_state);
do {
gen_yield_from_lock_wait(gen, &frame_state);

if (frame_state == FRAME_EXECUTING) {
gen_raise_already_executing_error(gen);
return NULL;
Expand Down Expand Up @@ -876,12 +895,25 @@ static PyObject *
gen_getyieldfrom(PyObject *self, void *Py_UNUSED(ignored))
{
PyGenObject *gen = _PyGen_CAST(self);
int8_t frame_state = FT_ATOMIC_LOAD_INT8_RELAXED(gen->gi_frame_state);
#ifdef Py_GIL_DISABLED
int8_t frame_state = _Py_atomic_load_int8_relaxed(&gen->gi_frame_state);
do {
gen_yield_from_lock_wait(gen, &frame_state);
if (frame_state != FRAME_SUSPENDED_YIELD_FROM) {
Py_RETURN_NONE;
}
} while (!_Py_GEN_TRY_SET_FRAME_STATE(gen, frame_state, FRAME_SUSPENDED_YIELD_FROM_LOCKED));

PyObject *result = PyStackRef_AsPyObjectNew(_PyFrame_StackPeek(&gen->gi_iframe));
_Py_atomic_store_int8_release(&gen->gi_frame_state, FRAME_SUSPENDED_YIELD_FROM);
return result;
#else
int8_t frame_state = gen->gi_frame_state;
if (frame_state != FRAME_SUSPENDED_YIELD_FROM) {
Py_RETURN_NONE;
}
// TODO: still not thread-safe with free threading
return PyStackRef_AsPyObjectNew(_PyFrame_StackPeek(&gen->gi_iframe));
#endif
}


Expand Down
4 changes: 3 additions & 1 deletion Python/ceval.c
Original file line number Diff line number Diff line change
Expand Up @@ -3391,7 +3391,9 @@ _PyEval_GetAwaitable(PyObject *iterable, int oparg)
else if (PyCoro_CheckExact(iter)) {
PyCoroObject *coro = (PyCoroObject *)iter;
int8_t frame_state = FT_ATOMIC_LOAD_INT8_RELAXED(coro->cr_frame_state);
if (frame_state == FRAME_SUSPENDED_YIELD_FROM) {
if (frame_state == FRAME_SUSPENDED_YIELD_FROM ||
frame_state == FRAME_SUSPENDED_YIELD_FROM_LOCKED)
{
/* `iter` is a coroutine object that is being awaited. */
Py_CLEAR(iter);
_PyErr_SetString(PyThreadState_GET(), PyExc_RuntimeError,
Expand Down
5 changes: 4 additions & 1 deletion Python/ceval_macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -522,19 +522,22 @@ gen_try_set_executing(PyGenObject *gen)
#ifdef Py_GIL_DISABLED
if (!_PyObject_IsUniquelyReferenced((PyObject *)gen)) {
int8_t frame_state = _Py_atomic_load_int8_relaxed(&gen->gi_frame_state);
while (frame_state < FRAME_EXECUTING) {
while (frame_state < FRAME_SUSPENDED_YIELD_FROM_LOCKED) {
if (_Py_atomic_compare_exchange_int8(&gen->gi_frame_state,
&frame_state,
FRAME_EXECUTING)) {
return true;
}
}
// NB: We return false for FRAME_SUSPENDED_YIELD_FROM_LOCKED as well.
// That case is rare enough that we can just handle it in the deopt.
return false;
}
#endif
// Use faster non-atomic modifications in the GIL-enabled build and when
// the object is uniquely referenced in the free-threaded build.
if (gen->gi_frame_state < FRAME_EXECUTING) {
assert(gen->gi_frame_state != FRAME_SUSPENDED_YIELD_FROM_LOCKED);
gen->gi_frame_state = FRAME_EXECUTING;
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion Python/lock.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ struct mutex_entry {
int handed_off;
};

static void
void
_Py_yield(void)
{
#ifdef MS_WINDOWS
Expand Down
Loading