Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 199 additions & 0 deletions src/conductor/config/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ def get(self, key: str, default: object = None) -> object:
def validate_workflow_config(
config: WorkflowConfig,
workflow_path: Path | None = None,
*,
_visited_subworkflows: frozenset[tuple[int, int]] | None = None,
_subworkflow_depth: int = 0,
) -> list[str]:
"""Perform comprehensive validation of a workflow configuration.

Expand All @@ -98,6 +101,12 @@ def validate_workflow_config(
Args:
config: The WorkflowConfig to validate.
workflow_path: Optional path to the workflow file (for !file resolution).
_visited_subworkflows: Internal — set of canonical (st_dev, st_ino)
tuples for sub-workflow files already on the validation stack,
used for cycle detection in recursive sub-workflow validation.
External callers should leave this as ``None``.
_subworkflow_depth: Internal — current recursion depth for
sub-workflow validation. External callers should leave this as 0.

Returns:
A list of warning messages (non-fatal issues).
Expand Down Expand Up @@ -193,6 +202,19 @@ def validate_workflow_config(
"inline agent. Script steps cannot be used in for_each groups."
)

# Validate sub-workflow references (local paths and registry refs).
# Skipped when workflow_path is not provided — relative paths cannot be
# resolved without knowing the file's location.
if workflow_path is not None:
sub_errors, sub_warnings = _validate_subworkflow_refs(
config,
workflow_path,
_visited=_visited_subworkflows,
_depth=_subworkflow_depth,
)
errors.extend(sub_errors)
warnings.extend(sub_warnings)

# Validate workflow output references
output_errors = _validate_output_references(
config.output,
Expand Down Expand Up @@ -749,6 +771,183 @@ def _collect_template_strings(
return templates


# Maximum depth for recursive sub-workflow validation to prevent infinite loops.
_MAX_SUBWORKFLOW_VALIDATION_DEPTH = 10


def _validate_subworkflow_refs(
config: WorkflowConfig,
workflow_path: Path | None,
_visited: frozenset[tuple[int, int]] | None = None,
_depth: int = 0,
) -> tuple[list[str], list[str]]:
"""Validate all ``type: workflow`` agent references in *config*.

For local paths, checks that the file exists. For registry references,
fetches the workflow to the local cache and recursively validates the
full composition tree. Cycle detection uses inode identity so that the
same file referenced via different cases (on case-insensitive
filesystems like macOS/Windows) or via symlinks resolves to the same
canonical key.

Args:
config: The workflow configuration to validate.
workflow_path: Path of the workflow file being validated (used as the
base directory for relative sub-workflow paths).
_visited: Set of already-visited canonical (st_dev, st_ino) tuples
for cycle detection. Callers should leave this as ``None``; it is
threaded through recursive calls.
_depth: Current recursion depth (internal). When the depth reaches
:data:`_MAX_SUBWORKFLOW_VALIDATION_DEPTH`, recursion stops and a
warning is emitted so callers know the validation tree was
truncated.

Returns:
Tuple of (error messages, warning messages).
"""
if _visited is None:
_visited = frozenset()

errors: list[str] = []
warnings: list[str] = []

if _depth >= _MAX_SUBWORKFLOW_VALIDATION_DEPTH:
warnings.append(
f"Sub-workflow validation depth limit "
f"({_MAX_SUBWORKFLOW_VALIDATION_DEPTH}) reached; "
"deeper sub-workflows were not validated. "
"Reduce nesting or check for unintended cycles."
)
return errors, warnings

base_dir = workflow_path.resolve().parent if workflow_path is not None else Path.cwd()

# Collect all (agent_name, workflow_ref, context_label) tuples to validate.
candidates: list[tuple[str, str, str]] = []
for agent in config.agents:
if agent.type == "workflow" and agent.workflow:
candidates.append((agent.name, agent.workflow, f"agent '{agent.name}'"))
for fe in config.for_each:
agent = fe.agent
if agent.type == "workflow" and agent.workflow:
candidates.append(
(agent.name, agent.workflow, f"for_each group '{fe.name}' agent '{agent.name}'")
)

for _agent_name, workflow_ref, label in candidates:
sub_path, ref_errors = _resolve_subworkflow_ref_for_validation(
workflow_ref, label, base_dir
)
errors.extend(ref_errors)
if sub_path is None:
continue

# Use inode identity (st_dev, st_ino) for cycle detection so that the
# same file referenced via different cases (case-insensitive
# filesystems) or different relative paths resolves to one key.
try:
stat = sub_path.stat()
canonical: tuple[int, int] = (stat.st_dev, stat.st_ino)
except OSError as exc:
# Should be rare since _resolve_subworkflow_ref_for_validation
# already returned a path it considered valid, but stat() can
# still fail on some platforms (e.g. permission errors).
errors.append(f"{label}: cannot stat sub-workflow file '{sub_path}': {exc}")
continue

if canonical in _visited:
errors.append(
f"{label}: circular sub-workflow reference detected "
f"('{workflow_ref}' → '{sub_path}' is already in the validation chain)"
)
continue

# Recursively validate the sub-workflow.
try:
from conductor.config.loader import load_config

sub_config = load_config(sub_path)
except Exception as exc:
errors.append(f"{label}: failed to load sub-workflow '{sub_path}': {exc}")
continue

try:
# Thread _visited and _depth through validate_workflow_config so
# nested sub-workflow validation also gets cycle detection.
sub_warnings = validate_workflow_config(
sub_config,
workflow_path=sub_path,
_visited_subworkflows=_visited | {canonical},
_subworkflow_depth=_depth + 1,
)
warnings.extend(f"{label} → sub-workflow '{sub_path.name}': {w}" for w in sub_warnings)
except ConfigurationError as exc:
errors.append(f"{label}: sub-workflow '{sub_path.name}' failed validation: {exc}")

return errors, warnings


def _resolve_subworkflow_ref_for_validation(
workflow_ref: str,
label: str,
base_dir: Path,
) -> tuple[Path | None, list[str]]:
"""Resolve a ``workflow:`` field value to a local path for validation.

Mirrors the engine's ``_resolve_subworkflow_path`` but is synchronous and
returns errors as a list rather than raising.

Args:
workflow_ref: The raw ``workflow:`` field value.
label: Human-readable context for error messages.
base_dir: Base directory for relative path resolution.

Returns:
Tuple of (resolved path or None on error, list of error strings).
"""
from conductor.registry.errors import RegistryError
from conductor.registry.resolver import resolve_ref

errors: list[str] = []

# Check for an existing file beside the parent workflow first.
candidate = (base_dir / workflow_ref).resolve()
if candidate.is_file():
return candidate, errors

try:
resolved = resolve_ref(workflow_ref)
except RegistryError as exc:
errors.append(f"{label}: invalid sub-workflow reference '{workflow_ref}': {exc}")
return None, errors

if resolved.kind == "file":
# File-path syntax but file does not exist.
errors.append(f"{label}: sub-workflow file not found: '{candidate}'")
return None, errors

# Registry reference: fetch (uses cache; makes network request on first access).
from conductor.registry.cache import fetch_workflow

# registry_name, registry_entry, and workflow are always set when kind == "registry"
assert resolved.registry_name is not None # noqa: S101
assert resolved.registry_entry is not None # noqa: S101
assert resolved.workflow is not None # noqa: S101

try:
sub_path = fetch_workflow(
resolved.registry_name,
resolved.registry_entry,
resolved.workflow,
resolved.ref,
)
except RegistryError as exc:
errors.append(f"{label}: failed to fetch registry sub-workflow '{workflow_ref}': {exc}")
return None, errors

return sub_path, errors


def _validate_template_references(
config: WorkflowConfig,
workflow_path: Path | None = None,
Expand Down
108 changes: 104 additions & 4 deletions src/conductor/engine/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,106 @@ def _build_subworkflow_inputs(
workflow_ctx = context.get("workflow", {})
return dict(workflow_ctx.get("input", {})) if isinstance(workflow_ctx, dict) else {}

async def _resolve_subworkflow_path(
self,
agent_workflow: str,
agent_name: str,
base_dir: Path,
) -> Path:
"""Resolve a sub-workflow reference to a local filesystem path.

Handles both local file paths and registry references
(``workflow[@registry][#ref]`` syntax).

Resolution order:
1. If ``agent_workflow`` resolves to an existing file relative to
``base_dir``, return that path immediately. This preserves
backward-compatibility for bare names like ``analysis`` that refer
to a sibling file without a ``.yaml`` extension.
2. Otherwise, parse as a registry reference via
:func:`~conductor.registry.resolver.resolve_ref`.
3. If the parsed ref is still a file kind (e.g. a path with a
``.yaml`` extension that does not exist), return the resolved
path so the caller can emit a clear "file not found" error.
4. For registry refs, fetch the workflow (with caching) and return
the cached local path.

Note on checkpoint/resume: this helper is called on every
sub-workflow execution, including after :meth:`resume`. Pinned
registry refs (``name@registry#v1.2.3`` or ``name@registry#<sha>``)
always resolve to the same cached path. Mutable refs
(``name@registry#main`` or no ``#ref`` defaulting to "latest") may
resolve to a different commit on resume if the upstream branch has
moved. Use pinned tags or commit SHAs in production workflows when
deterministic resume is required.

Args:
agent_workflow: The ``workflow:`` field value from the agent def.
agent_name: Name of the containing agent (used in error messages).
base_dir: Directory of the parent workflow file used for relative
path resolution.

Returns:
Absolute path to the workflow YAML (local or cached registry copy).

Raises:
ExecutionError: If the registry reference is malformed, names an
unknown registry, or the registry fetch fails.
"""
from conductor.registry.errors import RegistryError
from conductor.registry.resolver import resolve_ref

# Step 1: check for an existing file relative to base_dir first.
# This ensures "analysis" beside the parent workflow is treated as a
# local file, not a registry lookup, even though it has no extension.
candidate = (base_dir / agent_workflow).resolve()
if candidate.is_file():
return candidate

# Step 2: heuristic parse — file-looking path or registry reference?
try:
resolved = resolve_ref(agent_workflow)
except RegistryError as exc:
raise ExecutionError(
f"Failed to resolve sub-workflow '{agent_workflow}' "
f"(referenced by agent '{agent_name}'): {exc}",
suggestion=(
"Check the registry reference syntax and ensure the registry "
"is configured (run 'conductor registry list')."
),
) from exc

if resolved.kind == "file":
# Step 3: file-path syntax but file does not exist — return the
# candidate path so the caller emits a clear "file not found" error.
return candidate

# Step 4: registry reference — fetch (or return cached) local path.
from conductor.registry.cache import fetch_workflow

# registry_name, registry_entry, and workflow are always set when kind == "registry"
assert resolved.registry_name is not None # noqa: S101
assert resolved.registry_entry is not None # noqa: S101
assert resolved.workflow is not None # noqa: S101

try:
return await asyncio.to_thread(
fetch_workflow,
resolved.registry_name,
resolved.registry_entry,
resolved.workflow,
resolved.ref,
)
except RegistryError as exc:
raise ExecutionError(
f"Failed to fetch registry sub-workflow '{agent_workflow}' "
f"(referenced by agent '{agent_name}'): {exc}",
suggestion=(
"Check that the registry name and workflow name are correct "
"and the registry is reachable."
),
) from exc

async def _execute_subworkflow(
self,
agent: AgentDef,
Expand Down Expand Up @@ -756,9 +856,9 @@ async def _execute_subworkflow(
else:
base_dir = Path.cwd()

sub_path = (base_dir / agent.workflow).resolve()
sub_path = await self._resolve_subworkflow_path(agent.workflow, agent.name, base_dir)

if not sub_path.exists():
if not sub_path.is_file():
raise ExecutionError(
f"Sub-workflow file not found: {sub_path} (referenced by agent '{agent.name}')",
suggestion="Check that the 'workflow' path is correct and the file exists.",
Expand Down Expand Up @@ -867,9 +967,9 @@ async def _execute_subworkflow_with_inputs(
else:
base_dir = Path.cwd()

sub_path = (base_dir / agent.workflow).resolve()
sub_path = await self._resolve_subworkflow_path(agent.workflow, agent.name, base_dir)

if not sub_path.exists():
if not sub_path.is_file():
raise ExecutionError(
f"Sub-workflow file not found: {sub_path} (referenced by agent '{agent.name}')",
suggestion="Check that the 'workflow' path is correct and the file exists.",
Expand Down
Loading
Loading