From 7e93964cead2efccc15bbb9132e4de79deede3b5 Mon Sep 17 00:00:00 2001 From: yashksaini-coder Date: Wed, 22 Apr 2026 22:57:09 +0530 Subject: [PATCH 1/3] refactor(pubsub-test): predicate-based readiness in subscribed_mesh The subscribed_mesh fixture used a fixed trio.sleep(settle_time) after every node subscribed, which is timing-dependent and either flaky or unnecessarily slow. Replace the sleep with a wait_for() poll that waits until every router's mesh[topic] has at least min(n-1, router.degree_low) peers before yielding. The fixture now exposes ready_timeout (default 5s) and poll_interval (default 20ms) instead of the old settle_time knob. Adds regression tests covering the ready case and the timeout case. No external callers of subscribed_mesh exist yet, so this is a safe rename. Fixes #1307. --- newsfragments/1307.internal.rst | 5 ++ tests/core/pubsub/conftest.py | 34 ++++++++++-- .../pubsub/test_subscribed_mesh_fixture.py | 55 +++++++++++++++++++ 3 files changed, 90 insertions(+), 4 deletions(-) create mode 100644 newsfragments/1307.internal.rst create mode 100644 tests/core/pubsub/test_subscribed_mesh_fixture.py diff --git a/newsfragments/1307.internal.rst b/newsfragments/1307.internal.rst new file mode 100644 index 000000000..a93eb2e32 --- /dev/null +++ b/newsfragments/1307.internal.rst @@ -0,0 +1,5 @@ +Replace the fixed ``trio.sleep(settle_time)`` in the ``subscribed_mesh`` pubsub +test fixture with deterministic predicate-based polling using ``wait_for()``. +The fixture now accepts ``ready_timeout`` / ``poll_interval`` and waits until +every router's mesh for the topic has at least ``min(n - 1, router.degree_low)`` +peers before yielding. diff --git a/tests/core/pubsub/conftest.py b/tests/core/pubsub/conftest.py index 1c889b349..08c7fe0f5 100644 --- a/tests/core/pubsub/conftest.py +++ b/tests/core/pubsub/conftest.py @@ -15,6 +15,7 @@ from libp2p.pubsub.pubsub import Pubsub from tests.utils.factories import PubsubFactory from tests.utils.pubsub.utils import dense_connect +from tests.utils.pubsub.wait import wait_for @dataclasses.dataclass(frozen=True, slots=True) @@ -88,18 +89,43 @@ async def connected_gossipsub_nodes( @asynccontextmanager async def subscribed_mesh( - topic: str, n: int, *, settle_time: float = 1.0, **kwargs: Any + topic: str, + n: int, + *, + ready_timeout: float = 5.0, + poll_interval: float = 0.02, + **kwargs: Any, ) -> AsyncIterator[GossipSubHarness]: """ Create *n* connected GossipSub nodes all subscribed to *topic*. - Waits *settle_time* seconds for mesh formation before yielding. + Waits (up to *ready_timeout* seconds) for every router's mesh for + *topic* to contain at least ``min(n - 1, router.degree_low)`` peers + before yielding. This replaces the previous fixed-sleep wait with a + deterministic, predicate-driven poll (see #1307). """ async with connected_gossipsub_nodes(n, **kwargs) as harness: for ps in harness.pubsubs: await ps.subscribe(topic) - # TODO(#378): replace fixed sleep with predicate-based mesh-ready polling - await trio.sleep(settle_time) + + routers = harness.routers + + def _mesh_ready() -> bool: + for router in routers: + expected = min(n - 1, router.degree_low) + if len(router.mesh.get(topic, set())) < expected: + return False + return True + + await wait_for( + _mesh_ready, + timeout=ready_timeout, + poll_interval=poll_interval, + fail_msg=( + f"mesh for topic {topic!r} did not form on all {n} routers " + f"within {ready_timeout}s" + ), + ) yield harness diff --git a/tests/core/pubsub/test_subscribed_mesh_fixture.py b/tests/core/pubsub/test_subscribed_mesh_fixture.py new file mode 100644 index 000000000..1ae683d38 --- /dev/null +++ b/tests/core/pubsub/test_subscribed_mesh_fixture.py @@ -0,0 +1,55 @@ +""" +Regression tests for the `subscribed_mesh` fixture's predicate-based +readiness wait (#1307). + +Covers that, after `subscribed_mesh` yields, every router's mesh for the +topic already contains the expected peers, so test bodies can rely on +mesh state without an additional sleep. +""" + +from __future__ import annotations + +import pytest + +from tests.core.pubsub.conftest import subscribed_mesh + +TOPIC = "test-mesh-ready" + + +@pytest.mark.trio +async def test_subscribed_mesh_yields_with_every_router_grafted() -> None: + """Each router's mesh[topic] has at least min(n-1, degree_low) peers.""" + n = 3 + async with subscribed_mesh(TOPIC, n) as harness: + for router in harness.routers: + expected = min(n - 1, router.degree_low) + assert len(router.mesh.get(TOPIC, set())) >= expected, ( + f"router mesh for {TOPIC!r} has " + f"{len(router.mesh.get(TOPIC, set()))} peers, expected >= {expected}" + ) + + +def _flatten(exc: BaseException) -> list[BaseException]: + """Recursively walk nested ExceptionGroups, returning the leaf exceptions.""" + if isinstance(exc, BaseExceptionGroup): + out: list[BaseException] = [] + for child in exc.exceptions: + out.extend(_flatten(child)) + return out + return [exc] + + +@pytest.mark.trio +async def test_subscribed_mesh_rejects_unreachable_readiness() -> None: + """A ready_timeout that's too short surfaces a TimeoutError, not a quiet sleep.""" + # Ask for an absurdly short timeout; the mesh can't form in 1ms. + # Trio's background service managers nest ExceptionGroups, so flatten + # the whole tree and confirm the TimeoutError is a leaf. + with pytest.raises(BaseExceptionGroup) as exc_info: + async with subscribed_mesh(TOPIC, 3, ready_timeout=0.001): + pytest.fail("subscribed_mesh should have timed out before yielding") + + leaves = _flatten(exc_info.value) + timeouts = [e for e in leaves if isinstance(e, TimeoutError)] + assert timeouts, f"expected a TimeoutError leaf, got {leaves!r}" + assert "mesh for topic" in str(timeouts[0]) From 73e383ccbee4adc2541ee43881b537ef5b118c98 Mon Sep 17 00:00:00 2001 From: yashksaini-coder Date: Thu, 23 Apr 2026 18:25:38 +0530 Subject: [PATCH 2/3] address Copilot review on #1315 - Validate ready_timeout and poll_interval are > 0 in subscribed_mesh so a zero/negative value surfaces as ValueError instead of busy-looping or silently timing out - Make the timeout-path test tolerant of both wrapped and unwrapped TimeoutError shapes (nursery strictness varies across upstream CMs) - Add a test covering the new argument validation - Support the exceptiongroup backport on Python 3.10, matching the rest of the repo's compat pattern --- tests/core/pubsub/conftest.py | 5 +++ .../pubsub/test_subscribed_mesh_fixture.py | 32 ++++++++++++++++--- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/tests/core/pubsub/conftest.py b/tests/core/pubsub/conftest.py index 08c7fe0f5..9eca16882 100644 --- a/tests/core/pubsub/conftest.py +++ b/tests/core/pubsub/conftest.py @@ -104,6 +104,11 @@ async def subscribed_mesh( before yielding. This replaces the previous fixed-sleep wait with a deterministic, predicate-driven poll (see #1307). """ + if ready_timeout <= 0: + raise ValueError(f"ready_timeout must be > 0, got {ready_timeout!r}") + if poll_interval <= 0: + raise ValueError(f"poll_interval must be > 0, got {poll_interval!r}") + async with connected_gossipsub_nodes(n, **kwargs) as harness: for ps in harness.pubsubs: await ps.subscribe(topic) diff --git a/tests/core/pubsub/test_subscribed_mesh_fixture.py b/tests/core/pubsub/test_subscribed_mesh_fixture.py index 1ae683d38..c1f9d753c 100644 --- a/tests/core/pubsub/test_subscribed_mesh_fixture.py +++ b/tests/core/pubsub/test_subscribed_mesh_fixture.py @@ -9,8 +9,15 @@ from __future__ import annotations +import sys + import pytest +# Python 3.10 doesn't have BaseExceptionGroup as a builtin; fall back to the +# exceptiongroup backport the rest of this repo already depends on. +if sys.version_info < (3, 11): # pragma: no cover + from exceptiongroup import BaseExceptionGroup # type: ignore[assignment] + from tests.core.pubsub.conftest import subscribed_mesh TOPIC = "test-mesh-ready" @@ -43,13 +50,30 @@ def _flatten(exc: BaseException) -> list[BaseException]: async def test_subscribed_mesh_rejects_unreachable_readiness() -> None: """A ready_timeout that's too short surfaces a TimeoutError, not a quiet sleep.""" # Ask for an absurdly short timeout; the mesh can't form in 1ms. - # Trio's background service managers nest ExceptionGroups, so flatten - # the whole tree and confirm the TimeoutError is a leaf. - with pytest.raises(BaseExceptionGroup) as exc_info: + # Trio may or may not wrap the TimeoutError in an ExceptionGroup depending + # on the nursery strictness of upstream context managers, so catch broadly + # and then assert on the flattened leaves. + raised: BaseException | None = None + try: async with subscribed_mesh(TOPIC, 3, ready_timeout=0.001): pytest.fail("subscribed_mesh should have timed out before yielding") + except BaseException as exc: + raised = exc - leaves = _flatten(exc_info.value) + assert raised is not None, "expected subscribed_mesh to raise" + leaves = _flatten(raised) timeouts = [e for e in leaves if isinstance(e, TimeoutError)] assert timeouts, f"expected a TimeoutError leaf, got {leaves!r}" assert "mesh for topic" in str(timeouts[0]) + + +@pytest.mark.trio +async def test_subscribed_mesh_rejects_non_positive_timing() -> None: + """ready_timeout and poll_interval must be strictly positive.""" + for bad in (0, -0.5): + with pytest.raises(ValueError, match="ready_timeout"): + async with subscribed_mesh(TOPIC, 2, ready_timeout=bad): + pytest.fail("validation should have rejected the argument") + with pytest.raises(ValueError, match="poll_interval"): + async with subscribed_mesh(TOPIC, 2, poll_interval=bad): + pytest.fail("validation should have rejected the argument") From d327a2f942e095908feca216499ed91ae9d4496d Mon Sep 17 00:00:00 2001 From: yashksaini-coder Date: Fri, 19 Jun 2026 13:05:01 +0530 Subject: [PATCH 3/3] test(pubsub): make subscribed_mesh timeout test deterministic test_subscribed_mesh_rejects_unreachable_readiness relied on a 1ms ready_timeout to force a TimeoutError, but wait_for checks the readiness predicate before the deadline. When the gossipsub mesh was already grafted by the first poll, the fixture yielded instead of timing out, so the test failed on faster runners (linux and windows, 3.12 core). Add an optional ready_predicate hook to the subscribed_mesh fixture and have the test inject an unsatisfiable predicate, exercising the timeout path deterministically regardless of mesh-formation timing. Verified stable over repeated runs. --- tests/core/pubsub/conftest.py | 10 ++++++++-- .../pubsub/test_subscribed_mesh_fixture.py | 19 +++++++++++++------ 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/tests/core/pubsub/conftest.py b/tests/core/pubsub/conftest.py index 9eca16882..39a55a69c 100644 --- a/tests/core/pubsub/conftest.py +++ b/tests/core/pubsub/conftest.py @@ -2,7 +2,7 @@ from __future__ import annotations -from collections.abc import AsyncIterator +from collections.abc import AsyncIterator, Callable from contextlib import asynccontextmanager import dataclasses from typing import Any @@ -94,6 +94,7 @@ async def subscribed_mesh( *, ready_timeout: float = 5.0, poll_interval: float = 0.02, + ready_predicate: Callable[[], bool] | None = None, **kwargs: Any, ) -> AsyncIterator[GossipSubHarness]: """ @@ -103,6 +104,11 @@ async def subscribed_mesh( *topic* to contain at least ``min(n - 1, router.degree_low)`` peers before yielding. This replaces the previous fixed-sleep wait with a deterministic, predicate-driven poll (see #1307). + + *ready_predicate* overrides the default mesh-readiness check; pass an + unsatisfiable predicate to exercise the timeout path deterministically + (the default cannot be made to time out reliably, since the mesh may + already be formed by the first poll). """ if ready_timeout <= 0: raise ValueError(f"ready_timeout must be > 0, got {ready_timeout!r}") @@ -123,7 +129,7 @@ def _mesh_ready() -> bool: return True await wait_for( - _mesh_ready, + ready_predicate or _mesh_ready, timeout=ready_timeout, poll_interval=poll_interval, fail_msg=( diff --git a/tests/core/pubsub/test_subscribed_mesh_fixture.py b/tests/core/pubsub/test_subscribed_mesh_fixture.py index c1f9d753c..93263733c 100644 --- a/tests/core/pubsub/test_subscribed_mesh_fixture.py +++ b/tests/core/pubsub/test_subscribed_mesh_fixture.py @@ -48,14 +48,21 @@ def _flatten(exc: BaseException) -> list[BaseException]: @pytest.mark.trio async def test_subscribed_mesh_rejects_unreachable_readiness() -> None: - """A ready_timeout that's too short surfaces a TimeoutError, not a quiet sleep.""" - # Ask for an absurdly short timeout; the mesh can't form in 1ms. - # Trio may or may not wrap the TimeoutError in an ExceptionGroup depending - # on the nursery strictness of upstream context managers, so catch broadly - # and then assert on the flattened leaves. + """ + Unreachable readiness surfaces a TimeoutError, not a quiet yield. + + Uses an unsatisfiable readiness predicate so the timeout path is exercised + deterministically — relying on a tiny ``ready_timeout`` is racy, since the + mesh may already be formed by the first poll (``wait_for`` checks the + predicate before the deadline), in which case the fixture yields instead. + Trio may or may not wrap the TimeoutError in an ExceptionGroup depending on + nursery strictness, so catch broadly and assert on the flattened leaves. + """ raised: BaseException | None = None try: - async with subscribed_mesh(TOPIC, 3, ready_timeout=0.001): + async with subscribed_mesh( + TOPIC, 3, ready_predicate=lambda: False, ready_timeout=0.05 + ): pytest.fail("subscribed_mesh should have timed out before yielding") except BaseException as exc: raised = exc