-
Notifications
You must be signed in to change notification settings - Fork 200
Open
Description
When testing, I get ome tests passing ok individually, but as soon as you run them together, the last one fails.
all tests are identical
The 3rd one will fail at some point to put.
I have no idea why, but it seems to happen during publish_message
Any idea why?
Pierre
env: python 3.12 on mac
import asyncio
import pytest
import faust
import os
# Purpose of this test class is only to demonstrate a full end to en test.
# No code used in real app will be present here
# Connect to the INTERNAL docker listener
BROKER_URL = os.getenv('KAFKA_BOOTSTRAP_SERVER', 'localhost:9092')
app = faust.App(
'test-sandbox-worker',
broker=f'kafka://{BROKER_URL}',
store='memory://',
autodiscover=True,
origin='tests' # Important so it finds this module
)
# --- Data Models ---
class MockEvent(faust.Record):
key: str
value: int
class MockResult(faust.Record):
key: str
total: int
# --- Topics ---
# Using specific test topics to avoid clashing with real data
input_topic = app.topic('test.sandbox.input', value_type=MockEvent)
output_topic = app.topic('test.sandbox.output', value_type=MockResult)
# --- State ---
test_table = app.Table('test_table', default=int)
# --- Agent ---
@app.agent(input_topic, sink=[output_topic])
async def process_test_flow(stream):
# Le group_by est important pour le partitionnement
async for event in stream.group_by(MockEvent.key):
# aggregation
test_table[event.key] += event.value
# generate result
yield MockResult(
key=event.key,
total=test_table[event.key]
)
@pytest.fixture(scope="function")
async def test_app():
app.loop = asyncio.get_running_loop()
app.finalize()
app.conf.store = 'memory://'
app.flow_control.resume()
return app
@pytest.mark.asyncio
async def test_aggregation_logic_native(test_app):
user_id = "user_123"
events = []
for i in range(50):
events.extend([
MockEvent(key=user_id, value=10),
MockEvent(key=user_id, value=20),
MockEvent(key=user_id, value=5),
])
async with process_test_flow.test_context() as agent:
for ev in events:
try:
await asyncio.wait_for(agent.put(ev), timeout=1.0)
except asyncio.TimeoutError:
pytest.fail(f"Agent hung while processing {ev}")
await asyncio.sleep(0.001)
@pytest.mark.asyncio
async def test_aggregation_logic_native_02(test_app):
user_id = "user_123"
events = []
for i in range(50):
events.extend([
MockEvent(key=user_id, value=10),
MockEvent(key=user_id, value=20),
MockEvent(key=user_id, value=5),
])
async with process_test_flow.test_context() as agent:
for ev in events:
try:
await asyncio.wait_for(agent.put(ev), timeout=1.0)
except asyncio.TimeoutError:
pytest.fail(f"Agent hung while processing {ev}")
await asyncio.sleep(0.001)
@pytest.mark.asyncio
async def test_aggregation_logic_native_03(test_app):
user_id = "user_123"
events = []
for i in range(50):
events.extend([
MockEvent(key=user_id, value=10),
MockEvent(key=user_id, value=20),
MockEvent(key=user_id, value=5),
])
async with process_test_flow.test_context() as agent:
for ev in events:
try:
await asyncio.wait_for(agent.put(ev), timeout=1.0)
except asyncio.TimeoutError:
pytest.fail(f"Agent hung while processing {ev}")
await asyncio.sleep(0.001)
Metadata
Metadata
Assignees
Labels
No labels