Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions newsfragments/1307.internal.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Replace the fixed ``trio.sleep(settle_time)`` in the ``subscribed_mesh`` pubsub
test fixture with deterministic predicate-based polling using ``wait_for()``.
The fixture now accepts ``ready_timeout`` / ``poll_interval`` and waits until
every router's mesh for the topic has at least ``min(n - 1, router.degree_low)``
peers before yielding.
47 changes: 42 additions & 5 deletions tests/core/pubsub/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from __future__ import annotations

from collections.abc import AsyncIterator
from collections.abc import AsyncIterator, Callable
from contextlib import asynccontextmanager
import dataclasses
from typing import Any
Expand All @@ -15,6 +15,7 @@
from libp2p.pubsub.pubsub import Pubsub
from tests.utils.factories import PubsubFactory
from tests.utils.pubsub.utils import dense_connect
from tests.utils.pubsub.wait import wait_for


@dataclasses.dataclass(frozen=True, slots=True)
Expand Down Expand Up @@ -88,18 +89,54 @@ async def connected_gossipsub_nodes(

@asynccontextmanager
async def subscribed_mesh(
topic: str, n: int, *, settle_time: float = 1.0, **kwargs: Any
topic: str,
n: int,
*,
ready_timeout: float = 5.0,
poll_interval: float = 0.02,
Comment on lines +95 to +96

Copilot AI Apr 22, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since poll_interval is now a public knob on this helper, it should be validated (e.g., must be > 0) to avoid accidental busy-looping or surprising behavior if a caller passes 0 or a negative value. A small explicit check with a clear error message would make failures easier to diagnose.

Copilot uses AI. Check for mistakes.
ready_predicate: Callable[[], bool] | None = None,
**kwargs: Any,
) -> AsyncIterator[GossipSubHarness]:
"""
Create *n* connected GossipSub nodes all subscribed to *topic*.

Waits *settle_time* seconds for mesh formation before yielding.
Waits (up to *ready_timeout* seconds) for every router's mesh for
*topic* to contain at least ``min(n - 1, router.degree_low)`` peers
before yielding. This replaces the previous fixed-sleep wait with a
deterministic, predicate-driven poll (see #1307).

*ready_predicate* overrides the default mesh-readiness check; pass an
unsatisfiable predicate to exercise the timeout path deterministically
(the default cannot be made to time out reliably, since the mesh may
already be formed by the first poll).
"""
if ready_timeout <= 0:
raise ValueError(f"ready_timeout must be > 0, got {ready_timeout!r}")
if poll_interval <= 0:
raise ValueError(f"poll_interval must be > 0, got {poll_interval!r}")

async with connected_gossipsub_nodes(n, **kwargs) as harness:
for ps in harness.pubsubs:
await ps.subscribe(topic)
# TODO(#378): replace fixed sleep with predicate-based mesh-ready polling
await trio.sleep(settle_time)

routers = harness.routers

def _mesh_ready() -> bool:
for router in routers:
expected = min(n - 1, router.degree_low)
if len(router.mesh.get(topic, set())) < expected:
return False
return True

await wait_for(
ready_predicate or _mesh_ready,
timeout=ready_timeout,
poll_interval=poll_interval,
Comment on lines +131 to +134

Copilot AI Apr 22, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since poll_interval is now a public knob on this helper, it should be validated (e.g., must be > 0) to avoid accidental busy-looping or surprising behavior if a caller passes 0 or a negative value. A small explicit check with a clear error message would make failures easier to diagnose.

Copilot uses AI. Check for mistakes.
fail_msg=(
f"mesh for topic {topic!r} did not form on all {n} routers "
f"within {ready_timeout}s"
),
)
yield harness


Expand Down
86 changes: 86 additions & 0 deletions tests/core/pubsub/test_subscribed_mesh_fixture.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""
Regression tests for the `subscribed_mesh` fixture's predicate-based
readiness wait (#1307).

Covers that, after `subscribed_mesh` yields, every router's mesh for the
topic already contains the expected peers, so test bodies can rely on
mesh state without an additional sleep.
"""

from __future__ import annotations

import sys

import pytest

# Python 3.10 doesn't have BaseExceptionGroup as a builtin; fall back to the
# exceptiongroup backport the rest of this repo already depends on.
if sys.version_info < (3, 11): # pragma: no cover
from exceptiongroup import BaseExceptionGroup # type: ignore[assignment]

from tests.core.pubsub.conftest import subscribed_mesh

TOPIC = "test-mesh-ready"


@pytest.mark.trio
async def test_subscribed_mesh_yields_with_every_router_grafted() -> None:
"""Each router's mesh[topic] has at least min(n-1, degree_low) peers."""
n = 3
async with subscribed_mesh(TOPIC, n) as harness:
for router in harness.routers:
expected = min(n - 1, router.degree_low)
assert len(router.mesh.get(TOPIC, set())) >= expected, (
f"router mesh for {TOPIC!r} has "
f"{len(router.mesh.get(TOPIC, set()))} peers, expected >= {expected}"
)


def _flatten(exc: BaseException) -> list[BaseException]:
"""Recursively walk nested ExceptionGroups, returning the leaf exceptions."""
if isinstance(exc, BaseExceptionGroup):
out: list[BaseException] = []
for child in exc.exceptions:
out.extend(_flatten(child))
return out
return [exc]
Comment on lines +39 to +46

Copilot AI Apr 22, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscribed_mesh(..., ready_timeout=...) timing out may raise TimeoutError directly (not necessarily wrapped in a BaseExceptionGroup), depending on how wait_for() and the surrounding Trio context are implemented. This test is currently too strict and can fail even when behavior is correct. Recommend accepting either: (a) a direct TimeoutError, or (b) an exception group containing a TimeoutError leaf. Also note that referencing BaseExceptionGroup hard-requires Python 3.11+ at import time; if the project test matrix includes <3.11, this module will fail collection—consider guarding with pytest.importorskip, a compatibility shim, or structuring the test to not reference BaseExceptionGroup unconditionally.

Copilot uses AI. Check for mistakes.


@pytest.mark.trio
async def test_subscribed_mesh_rejects_unreachable_readiness() -> None:
"""
Unreachable readiness surfaces a TimeoutError, not a quiet yield.

Uses an unsatisfiable readiness predicate so the timeout path is exercised
deterministically — relying on a tiny ``ready_timeout`` is racy, since the
mesh may already be formed by the first poll (``wait_for`` checks the
predicate before the deadline), in which case the fixture yields instead.
Trio may or may not wrap the TimeoutError in an ExceptionGroup depending on
nursery strictness, so catch broadly and assert on the flattened leaves.
"""
raised: BaseException | None = None
try:
async with subscribed_mesh(
TOPIC, 3, ready_predicate=lambda: False, ready_timeout=0.05
):
pytest.fail("subscribed_mesh should have timed out before yielding")
except BaseException as exc:
raised = exc

assert raised is not None, "expected subscribed_mesh to raise"
leaves = _flatten(raised)
timeouts = [e for e in leaves if isinstance(e, TimeoutError)]
assert timeouts, f"expected a TimeoutError leaf, got {leaves!r}"
assert "mesh for topic" in str(timeouts[0])


@pytest.mark.trio
async def test_subscribed_mesh_rejects_non_positive_timing() -> None:
"""ready_timeout and poll_interval must be strictly positive."""
for bad in (0, -0.5):
with pytest.raises(ValueError, match="ready_timeout"):
async with subscribed_mesh(TOPIC, 2, ready_timeout=bad):
pytest.fail("validation should have rejected the argument")
with pytest.raises(ValueError, match="poll_interval"):
async with subscribed_mesh(TOPIC, 2, poll_interval=bad):
pytest.fail("validation should have rejected the argument")
Loading