-
Notifications
You must be signed in to change notification settings - Fork 239
refactor(pubsub-test): predicate-based readiness in subscribed_mesh fixture #1315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
7e93964
73e383c
a97837c
0a8745f
61e6d10
d327a2f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| Replace the fixed ``trio.sleep(settle_time)`` in the ``subscribed_mesh`` pubsub | ||
| test fixture with deterministic predicate-based polling using ``wait_for()``. | ||
| The fixture now accepts ``ready_timeout`` / ``poll_interval`` and waits until | ||
| every router's mesh for the topic has at least ``min(n - 1, router.degree_low)`` | ||
| peers before yielding. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,7 +2,7 @@ | |
|
|
||
| from __future__ import annotations | ||
|
|
||
| from collections.abc import AsyncIterator | ||
| from collections.abc import AsyncIterator, Callable | ||
| from contextlib import asynccontextmanager | ||
| import dataclasses | ||
| from typing import Any | ||
|
|
@@ -15,6 +15,7 @@ | |
| from libp2p.pubsub.pubsub import Pubsub | ||
| from tests.utils.factories import PubsubFactory | ||
| from tests.utils.pubsub.utils import dense_connect | ||
| from tests.utils.pubsub.wait import wait_for | ||
|
|
||
|
|
||
| @dataclasses.dataclass(frozen=True, slots=True) | ||
|
|
@@ -88,18 +89,54 @@ async def connected_gossipsub_nodes( | |
|
|
||
| @asynccontextmanager | ||
| async def subscribed_mesh( | ||
| topic: str, n: int, *, settle_time: float = 1.0, **kwargs: Any | ||
| topic: str, | ||
| n: int, | ||
| *, | ||
| ready_timeout: float = 5.0, | ||
| poll_interval: float = 0.02, | ||
| ready_predicate: Callable[[], bool] | None = None, | ||
| **kwargs: Any, | ||
| ) -> AsyncIterator[GossipSubHarness]: | ||
| """ | ||
| Create *n* connected GossipSub nodes all subscribed to *topic*. | ||
|
|
||
| Waits *settle_time* seconds for mesh formation before yielding. | ||
| Waits (up to *ready_timeout* seconds) for every router's mesh for | ||
| *topic* to contain at least ``min(n - 1, router.degree_low)`` peers | ||
| before yielding. This replaces the previous fixed-sleep wait with a | ||
| deterministic, predicate-driven poll (see #1307). | ||
|
|
||
| *ready_predicate* overrides the default mesh-readiness check; pass an | ||
| unsatisfiable predicate to exercise the timeout path deterministically | ||
| (the default cannot be made to time out reliably, since the mesh may | ||
| already be formed by the first poll). | ||
| """ | ||
| if ready_timeout <= 0: | ||
| raise ValueError(f"ready_timeout must be > 0, got {ready_timeout!r}") | ||
| if poll_interval <= 0: | ||
| raise ValueError(f"poll_interval must be > 0, got {poll_interval!r}") | ||
|
|
||
| async with connected_gossipsub_nodes(n, **kwargs) as harness: | ||
| for ps in harness.pubsubs: | ||
| await ps.subscribe(topic) | ||
| # TODO(#378): replace fixed sleep with predicate-based mesh-ready polling | ||
| await trio.sleep(settle_time) | ||
|
|
||
| routers = harness.routers | ||
|
|
||
| def _mesh_ready() -> bool: | ||
| for router in routers: | ||
| expected = min(n - 1, router.degree_low) | ||
| if len(router.mesh.get(topic, set())) < expected: | ||
| return False | ||
| return True | ||
|
|
||
| await wait_for( | ||
| ready_predicate or _mesh_ready, | ||
| timeout=ready_timeout, | ||
| poll_interval=poll_interval, | ||
|
Comment on lines
+131
to
+134
|
||
| fail_msg=( | ||
| f"mesh for topic {topic!r} did not form on all {n} routers " | ||
| f"within {ready_timeout}s" | ||
| ), | ||
| ) | ||
| yield harness | ||
|
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| """ | ||
| Regression tests for the `subscribed_mesh` fixture's predicate-based | ||
| readiness wait (#1307). | ||
|
|
||
| Covers that, after `subscribed_mesh` yields, every router's mesh for the | ||
| topic already contains the expected peers, so test bodies can rely on | ||
| mesh state without an additional sleep. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import sys | ||
|
|
||
| import pytest | ||
|
|
||
| # Python 3.10 doesn't have BaseExceptionGroup as a builtin; fall back to the | ||
| # exceptiongroup backport the rest of this repo already depends on. | ||
| if sys.version_info < (3, 11): # pragma: no cover | ||
| from exceptiongroup import BaseExceptionGroup # type: ignore[assignment] | ||
|
|
||
| from tests.core.pubsub.conftest import subscribed_mesh | ||
|
|
||
| TOPIC = "test-mesh-ready" | ||
|
|
||
|
|
||
| @pytest.mark.trio | ||
| async def test_subscribed_mesh_yields_with_every_router_grafted() -> None: | ||
| """Each router's mesh[topic] has at least min(n-1, degree_low) peers.""" | ||
| n = 3 | ||
| async with subscribed_mesh(TOPIC, n) as harness: | ||
| for router in harness.routers: | ||
| expected = min(n - 1, router.degree_low) | ||
| assert len(router.mesh.get(TOPIC, set())) >= expected, ( | ||
| f"router mesh for {TOPIC!r} has " | ||
| f"{len(router.mesh.get(TOPIC, set()))} peers, expected >= {expected}" | ||
| ) | ||
|
|
||
|
|
||
| def _flatten(exc: BaseException) -> list[BaseException]: | ||
| """Recursively walk nested ExceptionGroups, returning the leaf exceptions.""" | ||
| if isinstance(exc, BaseExceptionGroup): | ||
| out: list[BaseException] = [] | ||
| for child in exc.exceptions: | ||
| out.extend(_flatten(child)) | ||
| return out | ||
| return [exc] | ||
|
Comment on lines
+39
to
+46
|
||
|
|
||
|
|
||
| @pytest.mark.trio | ||
| async def test_subscribed_mesh_rejects_unreachable_readiness() -> None: | ||
| """ | ||
| Unreachable readiness surfaces a TimeoutError, not a quiet yield. | ||
|
|
||
| Uses an unsatisfiable readiness predicate so the timeout path is exercised | ||
| deterministically — relying on a tiny ``ready_timeout`` is racy, since the | ||
| mesh may already be formed by the first poll (``wait_for`` checks the | ||
| predicate before the deadline), in which case the fixture yields instead. | ||
| Trio may or may not wrap the TimeoutError in an ExceptionGroup depending on | ||
| nursery strictness, so catch broadly and assert on the flattened leaves. | ||
| """ | ||
| raised: BaseException | None = None | ||
| try: | ||
| async with subscribed_mesh( | ||
| TOPIC, 3, ready_predicate=lambda: False, ready_timeout=0.05 | ||
| ): | ||
| pytest.fail("subscribed_mesh should have timed out before yielding") | ||
| except BaseException as exc: | ||
| raised = exc | ||
|
|
||
| assert raised is not None, "expected subscribed_mesh to raise" | ||
| leaves = _flatten(raised) | ||
| timeouts = [e for e in leaves if isinstance(e, TimeoutError)] | ||
| assert timeouts, f"expected a TimeoutError leaf, got {leaves!r}" | ||
| assert "mesh for topic" in str(timeouts[0]) | ||
|
|
||
|
|
||
| @pytest.mark.trio | ||
| async def test_subscribed_mesh_rejects_non_positive_timing() -> None: | ||
| """ready_timeout and poll_interval must be strictly positive.""" | ||
| for bad in (0, -0.5): | ||
| with pytest.raises(ValueError, match="ready_timeout"): | ||
| async with subscribed_mesh(TOPIC, 2, ready_timeout=bad): | ||
| pytest.fail("validation should have rejected the argument") | ||
| with pytest.raises(ValueError, match="poll_interval"): | ||
| async with subscribed_mesh(TOPIC, 2, poll_interval=bad): | ||
| pytest.fail("validation should have rejected the argument") | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since
poll_intervalis now a public knob on this helper, it should be validated (e.g., must be > 0) to avoid accidental busy-looping or surprising behavior if a caller passes 0 or a negative value. A small explicit check with a clear error message would make failures easier to diagnose.