Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dependencies = [
"langchain-text-splitters>=1.0,<2.0",
"beautifulsoup4>=4.12",
"psycopg[binary]>=3.2",
"toolguard>=0.2.17",
"cuga-oak-health; python_version>='3.12'",
"aiosmtpd",
]
Expand Down
8 changes: 8 additions & 0 deletions src/cuga/backend/cuga_graph/policy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
PolicyType,
Playbook,
IntentGuard,
ToolGuide,
ToolGuard,
ToolApproval,
OutputFormatter,
CustomPolicy,
PlaybookStep,
IntentGuardResponse,
Expand All @@ -31,6 +35,10 @@
"PolicyType",
"Playbook",
"IntentGuard",
"ToolGuide",
"ToolGuard",
"ToolApproval",
"OutputFormatter",
"CustomPolicy",
"PlaybookStep",
"IntentGuardResponse",
Expand Down
11 changes: 8 additions & 3 deletions src/cuga/backend/cuga_graph/policy/filesystem_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def _policy_to_markdown(self, policy: Policy) -> str:
# Build frontmatter
# Get policy type - try 'type' first, then 'policy_type'
policy_type = getattr(policy, 'type', None) or getattr(policy, 'policy_type', None)
policy_type_value = policy_type.value if hasattr(policy_type, 'value') else str(policy_type)
policy_type_value = policy_type.value if policy_type is not None and hasattr(policy_type, 'value') else str(policy_type)

frontmatter = {
'id': policy.id,
Expand All @@ -103,10 +103,11 @@ def _policy_to_markdown(self, policy: Policy) -> str:
}

# Add triggers if present
if hasattr(policy, 'triggers') and policy.triggers:
policy_triggers = getattr(policy, 'triggers', None)
if policy_triggers:
triggers_config = {}

for trigger in policy.triggers:
for trigger in policy_triggers:
if isinstance(trigger, KeywordTrigger):
triggers_config['keywords'] = trigger.value
triggers_config['target'] = trigger.target
Expand All @@ -133,6 +134,10 @@ def _policy_to_markdown(self, policy: Policy) -> str:
frontmatter['target_tools'] = policy.target_tools
if policy.target_apps:
frontmatter['target_apps'] = policy.target_apps
if policy.tool_guards:
frontmatter['tool_guards'] = {
tool_name: guard.model_dump() for tool_name, guard in policy.tool_guards.items()
}
frontmatter['prepend'] = policy.prepend
content = policy.guide_content or ""
elif isinstance(policy, IntentGuard):
Expand Down
25 changes: 18 additions & 7 deletions src/cuga/backend/cuga_graph/policy/folder_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,22 @@

import os
from pathlib import Path
from typing import Dict, Any, List
from typing import Dict, Any, List, cast
import yaml
from loguru import logger

from cuga.backend.cuga_graph.policy.models import (
Playbook,
OutputFormatter,
ToolGuide,
ToolGuard,
IntentGuard,
ToolApproval,
KeywordTrigger,
NaturalLanguageTrigger,
AlwaysTrigger,
IntentGuardResponse,
Trigger,
)


Expand Down Expand Up @@ -123,7 +125,7 @@ def create_playbook_from_markdown(
raise ValueError(f"Playbook in {file_path} missing 'name' in frontmatter")

triggers_config = frontmatter.get('triggers', {})
triggers = create_triggers_from_metadata(triggers_config)
triggers = cast(List[Trigger], create_triggers_from_metadata(triggers_config))

if not triggers:
raise ValueError(f"Playbook {name} must have at least one trigger")
Expand Down Expand Up @@ -160,10 +162,10 @@ def create_output_formatter_from_markdown(
raise ValueError(f"OutputFormatter in {file_path} missing 'name' in frontmatter")

triggers_config = frontmatter.get('triggers', {})
triggers = create_triggers_from_metadata(triggers_config)
triggers = cast(List[Trigger], create_triggers_from_metadata(triggers_config))

if not triggers:
triggers = [AlwaysTrigger()]
triggers = cast(List[Trigger], [AlwaysTrigger()])

format_type = frontmatter.get('format_type', 'markdown')
if format_type not in ['markdown', 'json_schema', 'direct']:
Expand Down Expand Up @@ -205,10 +207,17 @@ def create_tool_guide_from_markdown(
raise ValueError(f"ToolGuide {name} must specify 'target_tools'")

triggers_config = frontmatter.get('triggers', {})
triggers = create_triggers_from_metadata(triggers_config)
triggers = cast(List[Trigger], create_triggers_from_metadata(triggers_config))

if not triggers:
triggers = [AlwaysTrigger()]
triggers = cast(List[Trigger], [AlwaysTrigger()])

raw_tool_guards = frontmatter.get('tool_guards')
tool_guards = (
{tool_name: ToolGuard(**guard_config) for tool_name, guard_config in raw_tool_guards.items()}
if isinstance(raw_tool_guards, dict)
else None
)

return ToolGuide(
id=frontmatter.get('id', f"tool_guide_{Path(file_path).stem}"),
Expand All @@ -218,6 +227,7 @@ def create_tool_guide_from_markdown(
target_tools=target_tools,
target_apps=frontmatter.get('target_apps'),
guide_content=content,
tool_guards=tool_guards,
prepend=frontmatter.get('prepend', False),
priority=frontmatter.get('priority', 50),
enabled=frontmatter.get('enabled', True),
Expand All @@ -244,7 +254,7 @@ def create_intent_guard_from_markdown(
raise ValueError(f"IntentGuard in {file_path} missing 'name' in frontmatter")

triggers_config = frontmatter.get('triggers', {})
triggers = create_triggers_from_metadata(triggers_config)
triggers = cast(List[Trigger], create_triggers_from_metadata(triggers_config))

if not triggers:
raise ValueError(f"IntentGuard {name} must have at least one trigger")
Expand All @@ -261,6 +271,7 @@ def create_intent_guard_from_markdown(
response=IntentGuardResponse(
response_type=response_type,
content=content,
status_code=frontmatter.get('status_code'),
),
allow_override=frontmatter.get('allow_override', False),
priority=frontmatter.get('priority', 50),
Expand Down
23 changes: 23 additions & 0 deletions src/cuga/backend/cuga_graph/policy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,26 @@ class IntentGuardResponse(BaseModel):
status_code: Optional[int] = Field(None, description="HTTP status code if applicable")


class ToolGuard(BaseModel):
"""Guard configuration for a specific tool with compliance rules."""

violating_examples: List[str] = Field(
default_factory=list, description="Examples of violating usage patterns"
)
compliance_examples: List[str] = Field(
default_factory=list, description="Examples of compliant usage patterns"
)
policy_code: str = Field(
default="",
description=(
"Python code that validates tool usage compliance. "
"This code is executed in a sandboxed environment using the toolguard library. "
"Only trusted administrators with manage access should be allowed to modify policy code. "
"While sandboxed, policy code should still be reviewed for correctness and performance."
)
)

Comment thread
naamaz marked this conversation as resolved.

class IntentGuard(BaseModel):
"""Guard that intercepts intents and provides custom responses."""

Expand Down Expand Up @@ -214,6 +234,9 @@ class ToolGuide(BaseModel):
None, description="List of app names to enrich tools for (optional)"
)
guide_content: str = Field(..., description="Markdown content to append to tool descriptions")
tool_guards: Optional[Dict[str, ToolGuard]] = Field(
default=None, description="Optional guard configurations per tool (key: tool_name, value: ToolGuard)"
)
prepend: bool = Field(False, description="Whether to prepend content instead of appending")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata")
priority: int = Field(0, description="Priority when multiple guides match (higher = more important)")
Expand Down
51 changes: 51 additions & 0 deletions src/cuga/backend/cuga_graph/policy/tests/test_filesystem_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
IntentGuard,
Playbook,
ToolGuide,
ToolGuard,
ToolApproval,
OutputFormatter,
KeywordTrigger,
Expand Down Expand Up @@ -164,6 +165,39 @@ async def test_save_policy_to_filesystem(
for expected in expected_content:
assert expected in content

@pytest.mark.asyncio
async def test_save_tool_guide_with_tool_guards_to_filesystem(self, temp_cuga_folder):
"""Test saving tool_guards in ToolGuide frontmatter."""
fs_sync = PolicyFilesystemSync(cuga_folder=temp_cuga_folder)
policy = ToolGuide(
id="test_guide_with_guards",
name="Guide With Guards",
description="Test tool guide with per-tool guard configuration",
triggers=[AlwaysTrigger()],
target_tools=["test_tool"],
target_apps=None,
guide_content="## Guidelines\n- Be careful",
tool_guards={
"test_tool": ToolGuard(
violating_examples=["Deleting all records without confirmation"],
compliance_examples=["Delete one record after explicit confirmation"],
policy_code="def validate(call):\n return True",
)
},
prepend=False,
priority=0,
enabled=True,
)

file_path = fs_sync.save_policy_to_file(policy)

with open(file_path, "r", encoding="utf-8") as f:
content = f.read()

assert "tool_guards:" in content
assert "test_tool:" in content
assert "Only allow safe usage" in content

@pytest.mark.asyncio
async def test_delete_policy_file(self, temp_cuga_folder):
"""Test deleting a policy file"""
Expand Down Expand Up @@ -376,7 +410,18 @@ async def test_auto_load_multiple_policy_types(self, temp_cuga_folder):
description="Test",
triggers=[AlwaysTrigger()],
target_tools=["*"],
target_apps=None,
guide_content="## Test",
tool_guards={
"test_tool": ToolGuard(
violating_examples=["bad_example()"],
compliance_examples=["good_example()"],
policy_code="def validate(call):\n return True",
)
},
prepend=False,
priority=0,
enabled=True,
)

fs_sync.save_policy_to_file(guard)
Expand All @@ -399,6 +444,12 @@ async def test_auto_load_multiple_policy_types(self, temp_cuga_folder):
assert "playbook" in policy_types
assert "tool_guide" in policy_types

loaded_guide = await agent.policies.get("guide_1")
assert loaded_guide is not None
assert loaded_guide["policy"].tool_guards is not None
assert "test_tool" in loaded_guide["policy"].tool_guards
# ToolGuard no longer has description field - it's derived from ToolGuide

@pytest.mark.asyncio
async def test_auto_load_disabled(self, temp_cuga_folder):
"""Test that auto-load can be disabled"""
Expand Down
13 changes: 13 additions & 0 deletions src/cuga/backend/cuga_graph/policy/tool_guard/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""
Tool Guard integration for CUGA.

This module provides integration between CUGA's tool system and Toolguard's
policy enforcement framework.
"""

from .manager import ToolGuardManager
from .tool_guard_runtime import ToolGuardRuntime

__all__ = ["ToolGuardManager", "ToolGuardRuntime"]


Loading