Problem
Currently, users must explicitly collect and pass activities to the worker:
# Current approach - verbose and error-prone
activities = collect_activities(
temporal_multitool,
calculator_tool,
search_web_tool,
get_current_time_tool,
)
async with Worker(
client,
task_queue="queue",
workflows=[MultiToolAgentWorkflow],
activities=activities, # Must manually list all activities
...
)
This is:
- Error-prone: Easy to forget a tool or module
- Verbose: Duplicates information already in the workflow
- Not DRY: Workflow knows its modules, why list them again?
Pydantic AI provides a plugin (AgentPlugin) that automatically collects and passes activities to the worker.
from pydantic_ai.durable_exec.temporal import AgentPlugin
agent = dspy.ReAct(
"question -> answer",
tools=[evaluate_math, search_web_tool, get_current_time_tool],
)
temporal_agent = TemporalModule(agent, name="temporal_agent")
async with Worker(
client,
task_queue="queue",
workflows=[MultiToolAgentWorkflow], # Some workflows using DSPy programs
plugins=[AgentPlugin(temporal_agent)],
...
)
Benefits
- Less boilerplate: No
collect_activities() calls
Priority
High (Developer Experience)
References
- Pydantic AI implementation:
pydantic_ai/durable_exec/temporal/__init__.py:97-120
- Current example:
examples/multitool_agent.py:230-235
Problem
Currently, users must explicitly collect and pass activities to the worker:
This is:
Pydantic AI provides a plugin (AgentPlugin) that automatically collects and passes activities to the worker.
Benefits
collect_activities()callsPriority
High (Developer Experience)
References
pydantic_ai/durable_exec/temporal/__init__.py:97-120examples/multitool_agent.py:230-235