Skip to content

Commit e36eff2

Browse files
timsaucerclaude
andcommitted
fix: make interrupt test reliable on Python 3.11
PyThreadState_SetAsyncExc only delivers exceptions when the thread is executing Python bytecode, not while in native (Rust/C) code. The previous test had two issues causing flakiness on Python 3.11: 1. The interrupt fired before df.collect() entered the UDF, while the thread was still in native code where async exceptions are ignored. 2. time.sleep(2.0) is a single C call where async exceptions are not checked — they're only checked between bytecode instructions. Fix by adding a threading.Event so the interrupt waits until the UDF is actually executing Python code, and by sleeping in small increments so the eval loop has opportunities to check for pending exceptions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent fca0122 commit e36eff2

1 file changed

Lines changed: 19 additions & 2 deletions

File tree

python/tests/test_dataframe.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3416,10 +3416,18 @@ def test_fill_null_all_null_column(ctx):
34163416
assert result.column(1).to_pylist() == ["filled", "filled", "filled"]
34173417

34183418

3419+
_slow_udf_started = threading.Event()
3420+
3421+
34193422
@udf([pa.int64()], pa.int64(), "immutable")
34203423
def slow_udf(x: pa.Array) -> pa.Array:
3421-
# This must be longer than the check interval in wait_for_future
3422-
time.sleep(2.0)
3424+
_slow_udf_started.set()
3425+
# Sleep in small increments so Python's eval loop checks for pending
3426+
# async exceptions (like KeyboardInterrupt via PyThreadState_SetAsyncExc)
3427+
# between iterations. A single long time.sleep() is a C call where async
3428+
# exceptions are not checked on all Python versions (notably 3.11).
3429+
for _ in range(200):
3430+
time.sleep(0.01)
34233431
return x
34243432

34253433

@@ -3453,6 +3461,7 @@ def test_collect_or_stream_interrupted(slow_query, as_c_stream): # noqa: C901 P
34533461
if as_c_stream:
34543462
reader = pa.RecordBatchReader.from_stream(df)
34553463

3464+
_slow_udf_started.clear()
34563465
read_started = threading.Event()
34573466
read_exception = []
34583467
read_thread_id = None
@@ -3464,6 +3473,14 @@ def trigger_interrupt():
34643473
msg = f"Read operation did not start within {max_wait_time} seconds"
34653474
raise RuntimeError(msg)
34663475

3476+
# For slow_query tests, wait until the UDF is actually executing Python
3477+
# bytecode before sending the interrupt. PyThreadState_SetAsyncExc only
3478+
# delivers exceptions when the thread is in the Python eval loop, not
3479+
# while in native (Rust/C) code.
3480+
if slow_query and not _slow_udf_started.wait(timeout=max_wait_time):
3481+
msg = f"UDF did not start within {max_wait_time} seconds"
3482+
raise RuntimeError(msg)
3483+
34673484
if read_thread_id is None:
34683485
msg = "Cannot get read thread ID"
34693486
raise RuntimeError(msg)

0 commit comments

Comments
 (0)