diff --git a/newsfragments/1307.internal.rst b/newsfragments/1307.internal.rst new file mode 100644 index 000000000..a93eb2e32 --- /dev/null +++ b/newsfragments/1307.internal.rst @@ -0,0 +1,5 @@ +Replace the fixed ``trio.sleep(settle_time)`` in the ``subscribed_mesh`` pubsub +test fixture with deterministic predicate-based polling using ``wait_for()``. +The fixture now accepts ``ready_timeout`` / ``poll_interval`` and waits until +every router's mesh for the topic has at least ``min(n - 1, router.degree_low)`` +peers before yielding. diff --git a/tests/core/pubsub/conftest.py b/tests/core/pubsub/conftest.py index 1c889b349..39a55a69c 100644 --- a/tests/core/pubsub/conftest.py +++ b/tests/core/pubsub/conftest.py @@ -2,7 +2,7 @@ from __future__ import annotations -from collections.abc import AsyncIterator +from collections.abc import AsyncIterator, Callable from contextlib import asynccontextmanager import dataclasses from typing import Any @@ -15,6 +15,7 @@ from libp2p.pubsub.pubsub import Pubsub from tests.utils.factories import PubsubFactory from tests.utils.pubsub.utils import dense_connect +from tests.utils.pubsub.wait import wait_for @dataclasses.dataclass(frozen=True, slots=True) @@ -88,18 +89,54 @@ async def connected_gossipsub_nodes( @asynccontextmanager async def subscribed_mesh( - topic: str, n: int, *, settle_time: float = 1.0, **kwargs: Any + topic: str, + n: int, + *, + ready_timeout: float = 5.0, + poll_interval: float = 0.02, + ready_predicate: Callable[[], bool] | None = None, + **kwargs: Any, ) -> AsyncIterator[GossipSubHarness]: """ Create *n* connected GossipSub nodes all subscribed to *topic*. - Waits *settle_time* seconds for mesh formation before yielding. + Waits (up to *ready_timeout* seconds) for every router's mesh for + *topic* to contain at least ``min(n - 1, router.degree_low)`` peers + before yielding. This replaces the previous fixed-sleep wait with a + deterministic, predicate-driven poll (see #1307). + + *ready_predicate* overrides the default mesh-readiness check; pass an + unsatisfiable predicate to exercise the timeout path deterministically + (the default cannot be made to time out reliably, since the mesh may + already be formed by the first poll). """ + if ready_timeout <= 0: + raise ValueError(f"ready_timeout must be > 0, got {ready_timeout!r}") + if poll_interval <= 0: + raise ValueError(f"poll_interval must be > 0, got {poll_interval!r}") + async with connected_gossipsub_nodes(n, **kwargs) as harness: for ps in harness.pubsubs: await ps.subscribe(topic) - # TODO(#378): replace fixed sleep with predicate-based mesh-ready polling - await trio.sleep(settle_time) + + routers = harness.routers + + def _mesh_ready() -> bool: + for router in routers: + expected = min(n - 1, router.degree_low) + if len(router.mesh.get(topic, set())) < expected: + return False + return True + + await wait_for( + ready_predicate or _mesh_ready, + timeout=ready_timeout, + poll_interval=poll_interval, + fail_msg=( + f"mesh for topic {topic!r} did not form on all {n} routers " + f"within {ready_timeout}s" + ), + ) yield harness diff --git a/tests/core/pubsub/test_subscribed_mesh_fixture.py b/tests/core/pubsub/test_subscribed_mesh_fixture.py new file mode 100644 index 000000000..93263733c --- /dev/null +++ b/tests/core/pubsub/test_subscribed_mesh_fixture.py @@ -0,0 +1,86 @@ +""" +Regression tests for the `subscribed_mesh` fixture's predicate-based +readiness wait (#1307). + +Covers that, after `subscribed_mesh` yields, every router's mesh for the +topic already contains the expected peers, so test bodies can rely on +mesh state without an additional sleep. +""" + +from __future__ import annotations + +import sys + +import pytest + +# Python 3.10 doesn't have BaseExceptionGroup as a builtin; fall back to the +# exceptiongroup backport the rest of this repo already depends on. +if sys.version_info < (3, 11): # pragma: no cover + from exceptiongroup import BaseExceptionGroup # type: ignore[assignment] + +from tests.core.pubsub.conftest import subscribed_mesh + +TOPIC = "test-mesh-ready" + + +@pytest.mark.trio +async def test_subscribed_mesh_yields_with_every_router_grafted() -> None: + """Each router's mesh[topic] has at least min(n-1, degree_low) peers.""" + n = 3 + async with subscribed_mesh(TOPIC, n) as harness: + for router in harness.routers: + expected = min(n - 1, router.degree_low) + assert len(router.mesh.get(TOPIC, set())) >= expected, ( + f"router mesh for {TOPIC!r} has " + f"{len(router.mesh.get(TOPIC, set()))} peers, expected >= {expected}" + ) + + +def _flatten(exc: BaseException) -> list[BaseException]: + """Recursively walk nested ExceptionGroups, returning the leaf exceptions.""" + if isinstance(exc, BaseExceptionGroup): + out: list[BaseException] = [] + for child in exc.exceptions: + out.extend(_flatten(child)) + return out + return [exc] + + +@pytest.mark.trio +async def test_subscribed_mesh_rejects_unreachable_readiness() -> None: + """ + Unreachable readiness surfaces a TimeoutError, not a quiet yield. + + Uses an unsatisfiable readiness predicate so the timeout path is exercised + deterministically — relying on a tiny ``ready_timeout`` is racy, since the + mesh may already be formed by the first poll (``wait_for`` checks the + predicate before the deadline), in which case the fixture yields instead. + Trio may or may not wrap the TimeoutError in an ExceptionGroup depending on + nursery strictness, so catch broadly and assert on the flattened leaves. + """ + raised: BaseException | None = None + try: + async with subscribed_mesh( + TOPIC, 3, ready_predicate=lambda: False, ready_timeout=0.05 + ): + pytest.fail("subscribed_mesh should have timed out before yielding") + except BaseException as exc: + raised = exc + + assert raised is not None, "expected subscribed_mesh to raise" + leaves = _flatten(raised) + timeouts = [e for e in leaves if isinstance(e, TimeoutError)] + assert timeouts, f"expected a TimeoutError leaf, got {leaves!r}" + assert "mesh for topic" in str(timeouts[0]) + + +@pytest.mark.trio +async def test_subscribed_mesh_rejects_non_positive_timing() -> None: + """ready_timeout and poll_interval must be strictly positive.""" + for bad in (0, -0.5): + with pytest.raises(ValueError, match="ready_timeout"): + async with subscribed_mesh(TOPIC, 2, ready_timeout=bad): + pytest.fail("validation should have rejected the argument") + with pytest.raises(ValueError, match="poll_interval"): + async with subscribed_mesh(TOPIC, 2, poll_interval=bad): + pytest.fail("validation should have rejected the argument")