fix: agent_trust schema validation + feat: task pause/resume#490
fix: agent_trust schema validation + feat: task pause/resume#490The-Vaibhav-Yadav wants to merge 3 commits intoGetBindu:mainfrom
Conversation
Previously agent_trust was accepted as any arbitrary dict with no type or schema checks, creating a silent misconfiguration surface. Adds AgentTrustConfig (Pydantic BaseModel) in types.py to validate identity_provider, allowed_operations TrustLevel values, and optional field types. Wires _validate_agent_trust_config into ConfigValidator so invalid trust configs raise ValueError with a descriptive message. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
_handle_pause transitions a task from any active state (submitted, working, input-required, auth-required) to 'suspended'. _handle_resume transitions it back to 'submitted' and re-queues a run_task operation so the worker re-executes with the full stored message history. Both methods no-op silently when the task is missing or in an incompatible state, matching the defensive pattern used by cancel_task. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
✅ Files skipped from review due to trivial changes (1)
🚧 Files skipped from review as they are similar to previous changes (2)
📝 WalkthroughWalkthroughAdded a Pydantic model Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Worker
participant Storage
participant Scheduler
rect rgba(100, 150, 200, 0.5)
Note over Client,Scheduler: Pause Flow
Client->>Worker: _handle_pause(task_id)
Worker->>Storage: load_task(task_id)
Storage-->>Worker: task (non-terminal state)
Worker->>Storage: update_task(state="suspended")
Storage-->>Worker: confirmation
Worker-->>Client: pause complete (logged)
end
rect rgba(200, 150, 100, 0.5)
Note over Client,Scheduler: Resume Flow
Client->>Worker: _handle_resume(task_id)
Worker->>Storage: load_task(task_id)
Storage-->>Worker: task (state="suspended")
Worker->>Storage: update_task(state="submitted")
Storage-->>Worker: confirmation
Worker->>Scheduler: run_task(task_id, context_id)
Scheduler-->>Worker: execution queued / error
alt success
Worker-->>Client: resume complete (logged)
else failure
Worker->>Storage: update_task(state="suspended") %% rollback
Storage-->>Worker: confirmation
Worker-->>Client: error re-raised
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (6)
bindu/penguin/config_validator.py (1)
299-306: LGTM — minor: narrow the exception.Wrapping pydantic validation into a
ValueErrorwithagent_trustin the message is exactly what issue#382asks for, andfrom excpreserves the chain. Catching bareExceptionis broader than needed;pydantic.ValidationError(plusTypeErrorfor bad kwargs) would be more precise and avoid masking unrelated bugs in future refactors.Proposed change
- try: - AgentTrustConfig(**trust_config) - except Exception as exc: - raise ValueError(f"Invalid 'agent_trust' configuration: {exc}") from exc + try: + AgentTrustConfig(**trust_config) + except (pydantic.ValidationError, TypeError) as exc: + raise ValueError(f"Invalid 'agent_trust' configuration: {exc}") from exc(requires
import pydanticat top.)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bindu/penguin/config_validator.py` around lines 299 - 306, The except block in _validate_agent_trust_config currently catches a bare Exception; narrow it to pydantic.ValidationError and TypeError so only validation/argument errors from AgentTrustConfig(**trust_config) are converted to the ValueError. Update the except clause to catch pydantic.ValidationError and TypeError, keep the raise ValueError(f"Invalid 'agent_trust' configuration: {exc}") from exc, and add the required pydantic import if not already present.tests/unit/penguin/test_config_validator.py (2)
100-113: Test would pass even if validation drops fields — assert on a refetch.
result["agent_trust"] == trustcurrently checks the stored dict, but_process_complex_fieldsdoes not replaceagent_trustwith the validated model (it only validates for side-effects), so this assertion only proves the original dict was preserved, not thatAgentTrustConfigaccepted every field. Consider also asserting viaAgentTrustConfig(**trust)directly (or adding one test that passes an unknown field to confirm it is rejected — which it currently isn't; see related comment onAgentTrustConfig).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/penguin/test_config_validator.py` around lines 100 - 113, The test test_valid_agent_trust_with_all_fields only compares the original dict to result["agent_trust"], which doesn't prove validation preserved/accepted all fields because _process_complex_fields only validates for side-effects; update the test to assert the validated model by constructing AgentTrustConfig from the stored data (e.g., call AgentTrustConfig(**result["agent_trust"]) or AgentTrustConfig(**trust>) to ensure instantiation succeeds) or alternatively assert that AgentTrustConfig(**trust) equals the stored/returned model; reference ConfigValidator.validate_and_process, _process_complex_fields and AgentTrustConfig when making the change.
79-86: Address Ruff RUF012 onBASE_CONFIG.Ruff flags the mutable class attribute. Annotate as
ClassVaror convert to a helper/fixture so it's clear this is immutable shared test data. Also note that_config_with_trustuses{**self.BASE_CONFIG, ...}which is only a shallow copy — the nesteddeploymentdict is shared across tests; fine today since none mutate it, but worth acopy.deepcopyor fixture if tests ever start mutating.Proposed change
-class TestAgentTrustValidation: - """Tests for agent_trust configuration validation.""" - - BASE_CONFIG = { - "author": "test@example.com", - "name": "TestAgent", - "deployment": {"url": "http://localhost:3773"}, - } +class TestAgentTrustValidation: + """Tests for agent_trust configuration validation.""" + + BASE_CONFIG: ClassVar[dict] = { + "author": "test@example.com", + "name": "TestAgent", + "deployment": {"url": "http://localhost:3773"}, + }(add
from typing import ClassVar)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/penguin/test_config_validator.py` around lines 79 - 86, BASE_CONFIG is a mutable class attribute and should be annotated as a ClassVar to satisfy Ruff RUF012 and make intent explicit; update the test class to import ClassVar from typing and declare BASE_CONFIG: ClassVar[dict] = {...}. Also change _config_with_trust to return a deep copy of BASE_CONFIG before updating agent_trust (e.g., use copy.deepcopy) to avoid sharing the nested deployment dict between tests, or alternatively convert BASE_CONFIG into a fixture/helper that yields a fresh dict per test; update references to BASE_CONFIG and _config_with_trust accordingly.bindu/common/protocol/types.py (2)
1684-1700: Considerextra="forbid"to catch typos in user config.
AgentTrustConfiginheritsA2A_MODEL_CONFIGwhich doesn't setextra. Since this model is described as "config-layer validation" (distinct from the wireAgentTrustTypedDict) whose purpose is to reject invalidagent_trustdicts, it should reject unknown keys — otherwise{"identity_provider": "hydra", "identityProvidr": "custom"}or any misspelled optional field silently passes and gets dropped, defeating much of the validation promise of issue#382.Additionally, because
A2A_MODEL_CONFIGsetsalias_generator=to_camel, populate_by_name=True, callers can also supply camelCase keys (identityProvider,inheritedRoles, ...) for a field that is otherwise specified in snake_case throughout the config surface. If the intent is config-layer (snake_case only), consider a dedicatedConfigDict(extra="forbid")here instead of reusing the wire-format config.Proposed change
- model_config = A2A_MODEL_CONFIG + model_config = pydantic.ConfigDict(extra="forbid")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bindu/common/protocol/types.py` around lines 1684 - 1700, AgentTrustConfig currently inherits A2A_MODEL_CONFIG (which sets alias_generator=to_camel/populate_by_name) and does not forbid extra fields, so misspelled or camelCase keys can be accepted silently; update AgentTrustConfig to use a dedicated Pydantic config that sets extra="forbid" (and remove or override alias_generator/populate_by_name if you want to restrict to snake_case) so unknown keys are rejected — modify the model_config used by AgentTrustConfig (or set model_config = {"extra": "forbid", ...} on the class) to enforce extra="forbid" while preserving any other needed settings.
1694-1694:inherited_rolesis looser than the wire-format type.Wire-format
AgentTrust.inherited_rolesisList[KeycloakRole], but this config model acceptsList[Dict[str, Any]]with no nested validation. Issue#382asks for "nested/type validation for trust config fields", so consider validating role entries (e.g.List[KeycloakRole]or a dedicated config role model) rather than untyped dicts. If intentional (to defer role materialization), a comment noting this would help.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bindu/common/protocol/types.py` at line 1694, The inherited_roles field on AgentTrust is declared too loosely as List[Dict[str, Any]]; change it to validate against the wire-format type by using List[KeycloakRole] (or introduce a dedicated config model like KeycloakRoleConfig and use List[KeycloakRoleConfig]) and add appropriate pydantic/type validation so each role entry is validated rather than an untyped dict; if deferring materialization is intentional, add a clear inline comment on AgentTrust.inherited_roles explaining why dicts are used and where/when they will be validated or converted.tests/unit/server/workers/test_manifest_worker.py (1)
804-833: Add coverage for theauth-requiredpause path.The implementation allows pausing
"auth-required"tasks, but these tests only cover"working","submitted", and"input-required". Parametrize the pauseable-state test and include"auth-required"so auth-gated tasks don’t regress.Test refactor sketch
+ `@pytest.mark.parametrize`("state", ["submitted", "input-required", "auth-required"]) `@pytest.mark.asyncio` - async def test_pause_submitted_task_transitions_to_suspended(self): + async def test_pause_pauseable_task_transitions_to_suspended(self, state): worker, mock_storage, _ = self._make_worker() task_id = uuid4() - mock_storage.load_task.return_value = self._make_task(task_id, uuid4(), "submitted") + mock_storage.load_task.return_value = self._make_task(task_id, uuid4(), state) await worker._handle_pause({"task_id": task_id}) mock_storage.update_task.assert_called_once_with(task_id, state="suspended")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/server/workers/test_manifest_worker.py` around lines 804 - 833, The three tests testing pause transitions (test_pause_working_task_transitions_to_suspended, test_pause_submitted_task_transitions_to_suspended, test_pause_input_required_task_transitions_to_suspended) should be combined or refactored into a single parametrized pytest async test that iterates over pauseable states including "working", "submitted", "input-required", and "auth-required"; ensure the test still constructs the worker via self._make_worker(), sets mock_storage.load_task.return_value = self._make_task(task_id, uuid4(), state), calls await worker._handle_pause({"task_id": task_id}), and asserts mock_storage.update_task.assert_called_once_with(task_id, state="suspended") for each state.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@bindu/server/workers/base.py`:
- Around line 242-263: The handler _handle_pause currently allows pausing tasks
in "working" via the _PAUSEABLE_STATES set but only updates storage, risking
background execution continuing; either remove "working" from _PAUSEABLE_STATES
to prevent marking actively running tasks suspended until cooperative suspension
is implemented, or implement a cooperative suspension/cancellation handshake in
run_task (e.g., send a cancellation/suspend signal to the running executor and
wait for acknowledgement) and only call storage.update_task(task_id,
state="suspended") after the running task confirms it has stopped; reference
_PAUSEABLE_STATES, _handle_pause, run_task, and storage.update_task when making
the change.
- Around line 274-292: The resume flow currently sets state to "submitted"
before enqueueing which can leave tasks incorrectly transitioned if
scheduler.run_task fails; modify resume in the function that calls
_normalize_uuid/load_task (the resume handler in base.py) so the state
transition is atomic: either perform a storage compare-and-set/transactional
update (e.g., update_task with an expected previous state of "suspended") or
delay changing state until after scheduler.run_task succeeds and on any enqueue
error rollback to "suspended" (or use an outbox write + background enqueue to
ensure durability); reference and update the logic around load_task,
update_task, and scheduler.run_task to implement the CAS/rollback or outbox
pattern so resume never leaves a task in "submitted" unless it is durably
queued.
---
Nitpick comments:
In `@bindu/common/protocol/types.py`:
- Around line 1684-1700: AgentTrustConfig currently inherits A2A_MODEL_CONFIG
(which sets alias_generator=to_camel/populate_by_name) and does not forbid extra
fields, so misspelled or camelCase keys can be accepted silently; update
AgentTrustConfig to use a dedicated Pydantic config that sets extra="forbid"
(and remove or override alias_generator/populate_by_name if you want to restrict
to snake_case) so unknown keys are rejected — modify the model_config used by
AgentTrustConfig (or set model_config = {"extra": "forbid", ...} on the class)
to enforce extra="forbid" while preserving any other needed settings.
- Line 1694: The inherited_roles field on AgentTrust is declared too loosely as
List[Dict[str, Any]]; change it to validate against the wire-format type by
using List[KeycloakRole] (or introduce a dedicated config model like
KeycloakRoleConfig and use List[KeycloakRoleConfig]) and add appropriate
pydantic/type validation so each role entry is validated rather than an untyped
dict; if deferring materialization is intentional, add a clear inline comment on
AgentTrust.inherited_roles explaining why dicts are used and where/when they
will be validated or converted.
In `@bindu/penguin/config_validator.py`:
- Around line 299-306: The except block in _validate_agent_trust_config
currently catches a bare Exception; narrow it to pydantic.ValidationError and
TypeError so only validation/argument errors from
AgentTrustConfig(**trust_config) are converted to the ValueError. Update the
except clause to catch pydantic.ValidationError and TypeError, keep the raise
ValueError(f"Invalid 'agent_trust' configuration: {exc}") from exc, and add the
required pydantic import if not already present.
In `@tests/unit/penguin/test_config_validator.py`:
- Around line 100-113: The test test_valid_agent_trust_with_all_fields only
compares the original dict to result["agent_trust"], which doesn't prove
validation preserved/accepted all fields because _process_complex_fields only
validates for side-effects; update the test to assert the validated model by
constructing AgentTrustConfig from the stored data (e.g., call
AgentTrustConfig(**result["agent_trust"]) or AgentTrustConfig(**trust>) to
ensure instantiation succeeds) or alternatively assert that
AgentTrustConfig(**trust) equals the stored/returned model; reference
ConfigValidator.validate_and_process, _process_complex_fields and
AgentTrustConfig when making the change.
- Around line 79-86: BASE_CONFIG is a mutable class attribute and should be
annotated as a ClassVar to satisfy Ruff RUF012 and make intent explicit; update
the test class to import ClassVar from typing and declare BASE_CONFIG:
ClassVar[dict] = {...}. Also change _config_with_trust to return a deep copy of
BASE_CONFIG before updating agent_trust (e.g., use copy.deepcopy) to avoid
sharing the nested deployment dict between tests, or alternatively convert
BASE_CONFIG into a fixture/helper that yields a fresh dict per test; update
references to BASE_CONFIG and _config_with_trust accordingly.
In `@tests/unit/server/workers/test_manifest_worker.py`:
- Around line 804-833: The three tests testing pause transitions
(test_pause_working_task_transitions_to_suspended,
test_pause_submitted_task_transitions_to_suspended,
test_pause_input_required_task_transitions_to_suspended) should be combined or
refactored into a single parametrized pytest async test that iterates over
pauseable states including "working", "submitted", "input-required", and
"auth-required"; ensure the test still constructs the worker via
self._make_worker(), sets mock_storage.load_task.return_value =
self._make_task(task_id, uuid4(), state), calls await
worker._handle_pause({"task_id": task_id}), and asserts
mock_storage.update_task.assert_called_once_with(task_id, state="suspended") for
each state.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: bb088941-3b37-4927-8c08-f2fcc6009332
📒 Files selected for processing (5)
bindu/common/protocol/types.pybindu/penguin/config_validator.pybindu/server/workers/base.pytests/unit/penguin/test_config_validator.pytests/unit/server/workers/test_manifest_worker.py
| _PAUSEABLE_STATES = frozenset({"submitted", "working", "input-required", "auth-required"}) | ||
|
|
||
| async def _handle_pause(self, params: TaskIdParams) -> None: | ||
| """Handle pause operation. | ||
| """Pause a task by transitioning it to the 'suspended' state. | ||
|
|
||
| TODO: Implement task pause functionality | ||
| - Save current execution state | ||
| - Update task to 'suspended' state | ||
| - Release resources while preserving context | ||
| No-ops silently if the task is not found or already in a non-pauseable | ||
| state (terminal or already suspended), so callers do not need to guard. | ||
| """ | ||
| raise NotImplementedError("Pause operation not yet implemented") | ||
| task_id = self._normalize_uuid(params["task_id"]) | ||
| task = await self.storage.load_task(task_id) | ||
| if task is None: | ||
| logger.warning(f"Pause requested for unknown task {task_id}") | ||
| return | ||
|
|
||
| current_state = task["status"]["state"] | ||
| if current_state not in self._PAUSEABLE_STATES: | ||
| logger.warning( | ||
| f"Cannot pause task {task_id}: state '{current_state}' is not pauseable" | ||
| ) | ||
| return | ||
|
|
||
| await self.storage.update_task(task_id, state="suspended") |
There was a problem hiding this comment.
Don’t mark a working task suspended without stopping execution.
_handle_pause only rewrites storage state. If the task is actively executing, the worker/agent can continue and later overwrite "suspended" with "completed"/"failed", so pause appears successful while work continues. Either remove "working" until cooperative suspension/checkpointing is wired into run_task, or add that mechanism before setting "suspended".
Minimal safety direction
- _PAUSEABLE_STATES = frozenset({"submitted", "working", "input-required", "auth-required"})
+ _PAUSEABLE_STATES = frozenset({"submitted", "input-required", "auth-required"})If working pause support is required in this PR, the status update should be paired with a cancellation/suspension signal that running task execution observes before it can write a terminal state.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| _PAUSEABLE_STATES = frozenset({"submitted", "working", "input-required", "auth-required"}) | |
| async def _handle_pause(self, params: TaskIdParams) -> None: | |
| """Handle pause operation. | |
| """Pause a task by transitioning it to the 'suspended' state. | |
| TODO: Implement task pause functionality | |
| - Save current execution state | |
| - Update task to 'suspended' state | |
| - Release resources while preserving context | |
| No-ops silently if the task is not found or already in a non-pauseable | |
| state (terminal or already suspended), so callers do not need to guard. | |
| """ | |
| raise NotImplementedError("Pause operation not yet implemented") | |
| task_id = self._normalize_uuid(params["task_id"]) | |
| task = await self.storage.load_task(task_id) | |
| if task is None: | |
| logger.warning(f"Pause requested for unknown task {task_id}") | |
| return | |
| current_state = task["status"]["state"] | |
| if current_state not in self._PAUSEABLE_STATES: | |
| logger.warning( | |
| f"Cannot pause task {task_id}: state '{current_state}' is not pauseable" | |
| ) | |
| return | |
| await self.storage.update_task(task_id, state="suspended") | |
| _PAUSEABLE_STATES = frozenset({"submitted", "input-required", "auth-required"}) | |
| async def _handle_pause(self, params: TaskIdParams) -> None: | |
| """Pause a task by transitioning it to the 'suspended' state. | |
| No-ops silently if the task is not found or already in a non-pauseable | |
| state (terminal or already suspended), so callers do not need to guard. | |
| """ | |
| task_id = self._normalize_uuid(params["task_id"]) | |
| task = await self.storage.load_task(task_id) | |
| if task is None: | |
| logger.warning(f"Pause requested for unknown task {task_id}") | |
| return | |
| current_state = task["status"]["state"] | |
| if current_state not in self._PAUSEABLE_STATES: | |
| logger.warning( | |
| f"Cannot pause task {task_id}: state '{current_state}' is not pauseable" | |
| ) | |
| return | |
| await self.storage.update_task(task_id, state="suspended") |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@bindu/server/workers/base.py` around lines 242 - 263, The handler
_handle_pause currently allows pausing tasks in "working" via the
_PAUSEABLE_STATES set but only updates storage, risking background execution
continuing; either remove "working" from _PAUSEABLE_STATES to prevent marking
actively running tasks suspended until cooperative suspension is implemented, or
implement a cooperative suspension/cancellation handshake in run_task (e.g.,
send a cancellation/suspend signal to the running executor and wait for
acknowledgement) and only call storage.update_task(task_id, state="suspended")
after the running task confirms it has stopped; reference _PAUSEABLE_STATES,
_handle_pause, run_task, and storage.update_task when making the change.
- Narrow except clause in _validate_agent_trust_config to pydantic.ValidationError and TypeError (was bare Exception) - Add docstring to _validate_agent_trust_config - Set extra="forbid" on AgentTrustConfig so unknown/camelCase keys are rejected rather than silently dropped - Annotate BASE_CONFIG as ClassVar and use deepcopy in _config_with_trust to prevent test state leaking across runs - Replace all-fields equality check with AgentTrustConfig round-trip to prove validation preserves field values - Collapse three identical pause-state tests into one @parametrize test covering all four pauseable states - Roll back task to "suspended" if scheduler.run_task fails during resume so the task is never stranded in submitted state without a worker Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
|
Addressed all review feedback in the latest commit — narrowed exception types, added extra="forbid", ClassVar annotation, parametrized tests, and resume rollback. |
Summary
_handle_resume in Worker raised NotImplementedError, blocking any caller that issued pause/resume operations.
failed instead of suspending them.
_handle_resume (→ submitted + re-queue) in the base Worker.
behavior.
Change Type (select all that apply)
Scope:
Linked Issue/PR
User-Visible / Behavior Changes
Security Impact (required)
Verification
Environment
Steps to Test
Expected Behavior
Actual Behavior
Evidence (attach at least one)
tests/unit/penguin/test_config_validator.py::TestAgentTrustValidation::test_valid_agent_trust_hydra PASSED
tests/unit/penguin/test_config_validator.py::TestAgentTrustValidation::test_agent_trust_missing_identity_provider_raises PASSED
tests/unit/penguin/test_config_validator.py::TestAgentTrustValidation::test_agent_trust_invalid_identity_provider_raises PASSED
... (15 total)
tests/unit/server/workers/test_manifest_worker.py::TestWorkerPauseResume::test_pause_working_task_transitions_to_suspended PASSED
tests/unit/server/workers/test_manifest_worker.py::TestWorkerPauseResume::test_resume_suspended_task_resubmits PASSED
... (8 total)
845 passed, 3 skipped, 7 warnings
Human Verification (required)
tasks, resume re-queues with correct task_id/context_id, both operations no-op on unknown task IDs.
Compatibility / Migration
Failure Recovery (if this breaks)
restore the NotImplementedError stubs.
Risks and Mitigations
configs.
Checklist
Summary by CodeRabbit
New Features
Tests