Skip to content

refactor(pubsub-test): predicate-based readiness in subscribed_mesh fixture#1315

Open
yashksaini-coder wants to merge 6 commits into
libp2p:mainfrom
yashksaini-coder:refactor/1307-subscribed-mesh-predicate
Open

refactor(pubsub-test): predicate-based readiness in subscribed_mesh fixture#1315
yashksaini-coder wants to merge 6 commits into
libp2p:mainfrom
yashksaini-coder:refactor/1307-subscribed-mesh-predicate

Conversation

@yashksaini-coder

Copy link
Copy Markdown
Contributor

Summary

Fixes #1307 — replaces the fixed trio.sleep(settle_time) in the subscribed_mesh pubsub test fixture with deterministic, predicate-based polling using the wait_for() helper introduced in #1298.

Context

PR #1298 (PR 1 of 6 for #378) introduced subscribed_mesh(), which waited a fixed 1s for mesh formation:

# tests/core/pubsub/conftest.py — before
for ps in harness.pubsubs:
    await ps.subscribe(topic)
# TODO(#378): replace fixed sleep with predicate-based mesh-ready polling
await trio.sleep(settle_time)
yield harness

Both @acul71 and @IronJam11 flagged this as timing-dependent. This PR resolves that TODO.

Approach

Use the existing wait_for(predicate, timeout=..., poll_interval=...) helper to poll until every router's mesh[topic] has at least min(n - 1, router.degree_low) peers. This lines up with GossipSub's mesh degree bounds: for small fan-outs (n ≤ degree_low + 1) every subscriber ends up in every other subscriber's mesh; for larger n, each router reaches its degree floor.

def _mesh_ready() -> bool:
    for router in routers:
        expected = min(n - 1, router.degree_low)
        if len(router.mesh.get(topic, set())) < expected:
            return False
    return True

await wait_for(_mesh_ready, timeout=ready_timeout, poll_interval=poll_interval, ...)

The fixture signature swaps the settle_time: float = 1.0 knob for ready_timeout: float = 5.0 and poll_interval: float = 0.02. There are no external callers of subscribed_mesh yet (PR #1298 landed the fixture but nothing consumes it), so this is a safe rename.

Files changed

  • tests/core/pubsub/conftest.py — replace fixed sleep with wait_for poll, remove TODO(#378) comment
  • tests/core/pubsub/test_subscribed_mesh_fixture.py — new regression tests for both the happy path and the timeout path
  • newsfragments/1307.internal.rst — internal changelog entry

Testing

  • pytest tests/core/pubsub/test_subscribed_mesh_fixture.py — 2 passed (new)
  • pytest tests/core/pubsub/test_dummyaccount_demo.py tests/core/pubsub/test_subscribed_mesh_fixture.py — 11 passed
  • make lint clean (ruff, ruff-format, mypy, pyrefly)

References

Open for review from @acul71 / @seetadev.

Copilot AI review requested due to automatic review settings April 22, 2026 17:27

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Refactors the subscribed_mesh pubsub test helper to wait deterministically for mesh readiness (instead of a fixed sleep), addressing timing-dependent flakiness noted in #1307.

Changes:

  • Replace fixed trio.sleep() wait in subscribed_mesh with predicate-based polling via wait_for()
  • Add regression tests covering readiness success and timeout behavior
  • Add an internal newsfragment documenting the change

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.

File Description
tests/core/pubsub/conftest.py Replaces fixed “settle” sleep with a wait_for() predicate that checks per-router mesh readiness.
tests/core/pubsub/test_subscribed_mesh_fixture.py Adds regression tests asserting the fixture yields only after mesh readiness and surfaces timeouts.
newsfragments/1307.internal.rst Documents the internal refactor from fixed sleep to deterministic polling.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +32 to +39
def _flatten(exc: BaseException) -> list[BaseException]:
"""Recursively walk nested ExceptionGroups, returning the leaf exceptions."""
if isinstance(exc, BaseExceptionGroup):
out: list[BaseException] = []
for child in exc.exceptions:
out.extend(_flatten(child))
return out
return [exc]

Copilot AI Apr 22, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscribed_mesh(..., ready_timeout=...) timing out may raise TimeoutError directly (not necessarily wrapped in a BaseExceptionGroup), depending on how wait_for() and the surrounding Trio context are implemented. This test is currently too strict and can fail even when behavior is correct. Recommend accepting either: (a) a direct TimeoutError, or (b) an exception group containing a TimeoutError leaf. Also note that referencing BaseExceptionGroup hard-requires Python 3.11+ at import time; if the project test matrix includes <3.11, this module will fail collection—consider guarding with pytest.importorskip, a compatibility shim, or structuring the test to not reference BaseExceptionGroup unconditionally.

Copilot uses AI. Check for mistakes.
Comment on lines +48 to +50
with pytest.raises(BaseExceptionGroup) as exc_info:
async with subscribed_mesh(TOPIC, 3, ready_timeout=0.001):
pytest.fail("subscribed_mesh should have timed out before yielding")

Copilot AI Apr 22, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscribed_mesh(..., ready_timeout=...) timing out may raise TimeoutError directly (not necessarily wrapped in a BaseExceptionGroup), depending on how wait_for() and the surrounding Trio context are implemented. This test is currently too strict and can fail even when behavior is correct. Recommend accepting either: (a) a direct TimeoutError, or (b) an exception group containing a TimeoutError leaf. Also note that referencing BaseExceptionGroup hard-requires Python 3.11+ at import time; if the project test matrix includes <3.11, this module will fail collection—consider guarding with pytest.importorskip, a compatibility shim, or structuring the test to not reference BaseExceptionGroup unconditionally.

Copilot uses AI. Check for mistakes.
Comment on lines +45 to +49
# Ask for an absurdly short timeout; the mesh can't form in 1ms.
# Trio's background service managers nest ExceptionGroups, so flatten
# the whole tree and confirm the TimeoutError is a leaf.
with pytest.raises(BaseExceptionGroup) as exc_info:
async with subscribed_mesh(TOPIC, 3, ready_timeout=0.001):

Copilot AI Apr 22, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout-path test relies on an elapsed-time assumption (“can’t form in 1ms”), which can become flaky on fast CI or if mesh formation happens to complete before the first predicate check. To make this deterministic, consider monkeypatching tests.core.pubsub.conftest.wait_for (or the underlying clock/timer used by wait_for) to raise TimeoutError immediately, then assert the fixture propagates the error and message formatting as expected.

Copilot uses AI. Check for mistakes.
Comment on lines +95 to +96
ready_timeout: float = 5.0,
poll_interval: float = 0.02,

Copilot AI Apr 22, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since poll_interval is now a public knob on this helper, it should be validated (e.g., must be > 0) to avoid accidental busy-looping or surprising behavior if a caller passes 0 or a negative value. A small explicit check with a clear error message would make failures easier to diagnose.

Copilot uses AI. Check for mistakes.
Comment on lines +120 to +123
await wait_for(
_mesh_ready,
timeout=ready_timeout,
poll_interval=poll_interval,

Copilot AI Apr 22, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since poll_interval is now a public knob on this helper, it should be validated (e.g., must be > 0) to avoid accidental busy-looping or surprising behavior if a caller passes 0 or a negative value. A small explicit check with a clear error message would make failures easier to diagnose.

Copilot uses AI. Check for mistakes.
yashksaini-coder added a commit to yashksaini-coder/py-libp2p that referenced this pull request Apr 23, 2026
- Validate ready_timeout and poll_interval are > 0 in subscribed_mesh
  so a zero/negative value surfaces as ValueError instead of busy-looping
  or silently timing out
- Make the timeout-path test tolerant of both wrapped and unwrapped
  TimeoutError shapes (nursery strictness varies across upstream CMs)
- Add a test covering the new argument validation
- Support the exceptiongroup backport on Python 3.10, matching the rest
  of the repo's compat pattern
@yashksaini-coder

Copy link
Copy Markdown
Contributor Author

Addressed Copilot feedback in 9395a48:

  • subscribed_mesh now validates ready_timeout > 0 and poll_interval > 0, raising ValueError on bad input instead of silently busy-looping or timing out immediately. Added a test covering this.
  • Relaxed the timeout-path test to accept both raw TimeoutError and BaseExceptionGroup-wrapped forms (since nursery strictness varies across the upstream context-manager stack). The _flatten helper walks either shape.
  • Imported BaseExceptionGroup via the exceptiongroup backport for Python 3.10, matching the existing pattern in libp2p/tools/anyio_service/*.

Left the "1ms elapsed-time flake" suggestion as-is — the mesh genuinely can't form before the first predicate check on any realistic hardware, and monkeypatching wait_for would test the mock more than the code. Happy to revisit if it flakes in CI.

@yashksaini-coder yashksaini-coder changed the title refactor(pubsub-test): predicate-based readiness in subscribed_mesh fixture (Fixes #1307) refactor(pubsub-test): predicate-based readiness in subscribed_mesh fixture Apr 23, 2026
The subscribed_mesh fixture used a fixed trio.sleep(settle_time) after
every node subscribed, which is timing-dependent and either flaky or
unnecessarily slow. Replace the sleep with a wait_for() poll that waits
until every router's mesh[topic] has at least min(n-1, router.degree_low)
peers before yielding.

The fixture now exposes ready_timeout (default 5s) and poll_interval
(default 20ms) instead of the old settle_time knob. Adds regression tests
covering the ready case and the timeout case. No external callers of
subscribed_mesh exist yet, so this is a safe rename.

Fixes libp2p#1307.
- Validate ready_timeout and poll_interval are > 0 in subscribed_mesh
  so a zero/negative value surfaces as ValueError instead of busy-looping
  or silently timing out
- Make the timeout-path test tolerant of both wrapped and unwrapped
  TimeoutError shapes (nursery strictness varies across upstream CMs)
- Add a test covering the new argument validation
- Support the exceptiongroup backport on Python 3.10, matching the rest
  of the repo's compat pattern
@yashksaini-coder yashksaini-coder force-pushed the refactor/1307-subscribed-mesh-predicate branch from 9395a48 to 73e383c Compare May 21, 2026 07:19
acul71 and others added 4 commits May 28, 2026 17:02
…edicate' into refactor/1307-subscribed-mesh-predicate
test_subscribed_mesh_rejects_unreachable_readiness relied on a 1ms
ready_timeout to force a TimeoutError, but wait_for checks the readiness
predicate before the deadline. When the gossipsub mesh was already grafted
by the first poll, the fixture yielded instead of timing out, so the test
failed on faster runners (linux and windows, 3.12 core).

Add an optional ready_predicate hook to the subscribed_mesh fixture and have
the test inject an unsatisfiable predicate, exercising the timeout path
deterministically regardless of mesh-formation timing. Verified stable over
repeated runs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

refactor: replace fixed sleeps in pubsub test fixtures with predicate-based polling

3 participants