Skip to content

Faust test hanging while puting data in agents #670

@pbonneaudiabolocom

Description

@pbonneaudiabolocom

When testing, I get ome tests passing ok individually, but as soon as you run them together, the last one fails.

all tests are identical
The 3rd one will fail at some point to put.

I have no idea why, but it seems to happen during publish_message

Any idea why?

Pierre

env: python 3.12 on mac

import asyncio

import pytest

import faust
import os
# Purpose of this test class is only to demonstrate a full end to en test.
# No code used in real app will be present here
# Connect to the INTERNAL docker listener
BROKER_URL = os.getenv('KAFKA_BOOTSTRAP_SERVER', 'localhost:9092')

app = faust.App(
    'test-sandbox-worker',
    broker=f'kafka://{BROKER_URL}',
    store='memory://',
    autodiscover=True,
    origin='tests'  # Important so it finds this module
)


# --- Data Models ---
class MockEvent(faust.Record):
    key: str
    value: int


class MockResult(faust.Record):
    key: str
    total: int


# --- Topics ---
# Using specific test topics to avoid clashing with real data
input_topic = app.topic('test.sandbox.input', value_type=MockEvent)
output_topic = app.topic('test.sandbox.output', value_type=MockResult)

# --- State ---
test_table = app.Table('test_table', default=int)


# --- Agent ---
@app.agent(input_topic, sink=[output_topic])
async def process_test_flow(stream):
    # Le group_by est important pour le partitionnement
    async for event in stream.group_by(MockEvent.key):
        # aggregation
        test_table[event.key] += event.value

        # generate result
        yield MockResult(
            key=event.key,
            total=test_table[event.key]
        )

@pytest.fixture(scope="function")
async def test_app():
    app.loop = asyncio.get_running_loop()
    app.finalize()
    app.conf.store = 'memory://'
    app.flow_control.resume()
    return app

@pytest.mark.asyncio
async def test_aggregation_logic_native(test_app):
    user_id = "user_123"
    events = []
    for i in range(50):
        events.extend([
            MockEvent(key=user_id, value=10),
            MockEvent(key=user_id, value=20),
            MockEvent(key=user_id, value=5),
        ])
    async with process_test_flow.test_context() as agent:
        for ev in events:
            try:
                await asyncio.wait_for(agent.put(ev), timeout=1.0)
            except asyncio.TimeoutError:
                pytest.fail(f"Agent hung while processing {ev}")
            await asyncio.sleep(0.001)

@pytest.mark.asyncio
async def test_aggregation_logic_native_02(test_app):
    user_id = "user_123"
    events = []
    for i in range(50):
        events.extend([
            MockEvent(key=user_id, value=10),
            MockEvent(key=user_id, value=20),
            MockEvent(key=user_id, value=5),
        ])
    async with process_test_flow.test_context() as agent:
        for ev in events:
            try:
                await asyncio.wait_for(agent.put(ev), timeout=1.0)
            except asyncio.TimeoutError:
                pytest.fail(f"Agent hung while processing {ev}")
            await asyncio.sleep(0.001)

@pytest.mark.asyncio
async def test_aggregation_logic_native_03(test_app):
    user_id = "user_123"
    events = []
    for i in range(50):
        events.extend([
            MockEvent(key=user_id, value=10),
            MockEvent(key=user_id, value=20),
            MockEvent(key=user_id, value=5),
        ])
    async with process_test_flow.test_context() as agent:
        for ev in events:
            try:
                await asyncio.wait_for(agent.put(ev), timeout=1.0)
            except asyncio.TimeoutError:
                pytest.fail(f"Agent hung while processing {ev}")
            await asyncio.sleep(0.001)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions