From 2b7de76def88b7e3b4aa9cc433c2eec404917b55 Mon Sep 17 00:00:00 2001 From: Jason Robert Date: Thu, 14 May 2026 09:30:19 -0400 Subject: [PATCH 1/4] feat(engine): resolve registry references in sub-workflow workflow: field MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extends type: workflow agents to accept registry references (workflow[@registry][#ref] syntax) in the workflow: field, not just local file paths. Resolves issue #172. Resolution order in _resolve_subworkflow_path(): 1. If agent_workflow resolves to an existing file relative to the parent workflow directory, return that path (backward compat — preserves extensionless local file references like 'analysis'). 2. Parse via registry/resolver.resolve_ref() to detect file vs registry syntax. 3. If file-kind, return the (non-existent) path so the caller emits a clear "file not found" error. 4. If registry-kind, call cache.fetch_workflow() via asyncio.to_thread() to fetch and cache the workflow, returning the local cached path. Both _execute_subworkflow and _execute_subworkflow_with_inputs use the new helper. RegistryError from resolve_ref() and fetch_workflow() are wrapped in ExecutionError with agent context. conductor validate now resolves registry refs for all type: workflow agents (including for_each inline agents) when workflow_path is provided. Validation is recursive with cycle detection and a depth cap of 10. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/config/validator.py | 158 ++++++++++++++++++ src/conductor/engine/workflow.py | 99 ++++++++++- tests/test_config/test_validator.py | 232 ++++++++++++++++++++++++++ tests/test_engine/test_subworkflow.py | 223 +++++++++++++++++++++++++ uv.lock | 2 +- 5 files changed, 709 insertions(+), 5 deletions(-) diff --git a/src/conductor/config/validator.py b/src/conductor/config/validator.py index 7ba1d1b..4b92f76 100644 --- a/src/conductor/config/validator.py +++ b/src/conductor/config/validator.py @@ -193,6 +193,14 @@ def validate_workflow_config( "inline agent. Script steps cannot be used in for_each groups." ) + # Validate sub-workflow references (local paths and registry refs). + # Skipped when workflow_path is not provided — relative paths cannot be + # resolved without knowing the file's location. + if workflow_path is not None: + sub_errors, sub_warnings = _validate_subworkflow_refs(config, workflow_path) + errors.extend(sub_errors) + warnings.extend(sub_warnings) + # Validate workflow output references output_errors = _validate_output_references( config.output, @@ -749,6 +757,156 @@ def _collect_template_strings( return templates +# Maximum depth for recursive sub-workflow validation to prevent infinite loops. +_MAX_SUBWORKFLOW_VALIDATION_DEPTH = 10 + + +def _validate_subworkflow_refs( + config: WorkflowConfig, + workflow_path: Path | None, + _visited: frozenset[str] | None = None, + _depth: int = 0, +) -> tuple[list[str], list[str]]: + """Validate all ``type: workflow`` agent references in *config*. + + For local paths, checks that the file exists. For registry references, + fetches the workflow to the local cache and recursively validates the + full composition tree. Cycle detection prevents infinite recursion. + + Args: + config: The workflow configuration to validate. + workflow_path: Path of the workflow file being validated (used as the + base directory for relative sub-workflow paths). + _visited: Set of already-visited canonical paths (for cycle detection). + Callers should leave this as ``None``; it is threaded through + recursive calls. + _depth: Current recursion depth (internal). + + Returns: + Tuple of (error messages, warning messages). + """ + if _visited is None: + _visited = frozenset() + + errors: list[str] = [] + warnings: list[str] = [] + + if _depth >= _MAX_SUBWORKFLOW_VALIDATION_DEPTH: + return errors, warnings + + base_dir = workflow_path.resolve().parent if workflow_path is not None else Path.cwd() + + # Collect all (agent_name, workflow_ref, context_label) tuples to validate. + candidates: list[tuple[str, str, str]] = [] + for agent in config.agents: + if agent.type == "workflow" and agent.workflow: + candidates.append((agent.name, agent.workflow, f"agent '{agent.name}'")) + for fe in config.for_each: + agent = fe.agent + if agent.type == "workflow" and agent.workflow: + candidates.append( + (agent.name, agent.workflow, f"for_each group '{fe.name}' agent '{agent.name}'") + ) + + for _agent_name, workflow_ref, label in candidates: + sub_path, ref_errors = _resolve_subworkflow_ref_for_validation( + workflow_ref, label, base_dir + ) + errors.extend(ref_errors) + if sub_path is None: + continue + + canonical = str(sub_path) + if canonical in _visited: + errors.append( + f"{label}: circular sub-workflow reference detected " + f"('{workflow_ref}' → '{sub_path}' is already in the validation chain)" + ) + continue + + # Recursively validate the sub-workflow. + try: + from conductor.config.loader import load_config + + sub_config = load_config(sub_path) + except Exception as exc: + errors.append(f"{label}: failed to load sub-workflow '{sub_path}': {exc}") + continue + + try: + sub_warnings = validate_workflow_config( + sub_config, + workflow_path=sub_path, + ) + warnings.extend(f"{label} → sub-workflow '{sub_path.name}': {w}" for w in sub_warnings) + except Exception as exc: + # validate_workflow_config raises ConfigurationError on failure. + errors.append(f"{label}: sub-workflow '{sub_path.name}' failed validation: {exc}") + + return errors, warnings + + +def _resolve_subworkflow_ref_for_validation( + workflow_ref: str, + label: str, + base_dir: Path, +) -> tuple[Path | None, list[str]]: + """Resolve a ``workflow:`` field value to a local path for validation. + + Mirrors the engine's ``_resolve_subworkflow_path`` but is synchronous and + returns errors as a list rather than raising. + + Args: + workflow_ref: The raw ``workflow:`` field value. + label: Human-readable context for error messages. + base_dir: Base directory for relative path resolution. + + Returns: + Tuple of (resolved path or None on error, list of error strings). + """ + from conductor.registry.errors import RegistryError + from conductor.registry.resolver import resolve_ref + + errors: list[str] = [] + + # Check for an existing file beside the parent workflow first. + candidate = (base_dir / workflow_ref).resolve() + if candidate.is_file(): + return candidate, errors + + try: + resolved = resolve_ref(workflow_ref) + except RegistryError as exc: + errors.append(f"{label}: invalid sub-workflow reference '{workflow_ref}': {exc}") + return None, errors + + if resolved.kind == "file": + # File-path syntax but file does not exist. + errors.append(f"{label}: sub-workflow file not found: '{candidate}'") + return None, errors + + # Registry reference: fetch (uses cache; makes network request on first access). + from conductor.registry.cache import fetch_workflow + + # registry_name, registry_entry, and workflow are always set when kind == "registry" + assert resolved.registry_name is not None # noqa: S101 + assert resolved.registry_entry is not None # noqa: S101 + assert resolved.workflow is not None # noqa: S101 + + try: + sub_path = fetch_workflow( + resolved.registry_name, + resolved.registry_entry, + resolved.workflow, + resolved.ref, + ) + except RegistryError as exc: + errors.append(f"{label}: failed to fetch registry sub-workflow '{workflow_ref}': {exc}") + return None, errors + + return sub_path, errors + + def _validate_template_references( config: WorkflowConfig, workflow_path: Path | None = None, diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index 5c965cc..130542f 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -703,6 +703,97 @@ def _build_subworkflow_inputs( workflow_ctx = context.get("workflow", {}) return dict(workflow_ctx.get("input", {})) if isinstance(workflow_ctx, dict) else {} + async def _resolve_subworkflow_path( + self, + agent_workflow: str, + agent_name: str, + base_dir: Path, + ) -> Path: + """Resolve a sub-workflow reference to a local filesystem path. + + Handles both local file paths and registry references + (``workflow[@registry][#ref]`` syntax). + + Resolution order: + 1. If ``agent_workflow`` resolves to an existing file relative to + ``base_dir``, return that path immediately. This preserves + backward-compatibility for bare names like ``analysis`` that refer + to a sibling file without a ``.yaml`` extension. + 2. Otherwise, parse as a registry reference via + :func:`~conductor.registry.resolver.resolve_ref`. + 3. If the parsed ref is still a file kind (e.g. a path with a + ``.yaml`` extension that does not exist), return the resolved + path so the caller can emit a clear "file not found" error. + 4. For registry refs, fetch the workflow (with caching) and return + the cached local path. + + Args: + agent_workflow: The ``workflow:`` field value from the agent def. + agent_name: Name of the containing agent (used in error messages). + base_dir: Directory of the parent workflow file used for relative + path resolution. + + Returns: + Absolute path to the workflow YAML (local or cached registry copy). + + Raises: + ExecutionError: If the registry reference is malformed, names an + unknown registry, or the registry fetch fails. + """ + from conductor.registry.errors import RegistryError + from conductor.registry.resolver import resolve_ref + + # Step 1: check for an existing file relative to base_dir first. + # This ensures "analysis" beside the parent workflow is treated as a + # local file, not a registry lookup, even though it has no extension. + candidate = (base_dir / agent_workflow).resolve() + if candidate.is_file(): + return candidate + + # Step 2: heuristic parse — file-looking path or registry reference? + try: + resolved = resolve_ref(agent_workflow) + except RegistryError as exc: + raise ExecutionError( + f"Failed to resolve sub-workflow '{agent_workflow}' " + f"(referenced by agent '{agent_name}'): {exc}", + suggestion=( + "Check the registry reference syntax and ensure the registry " + "is configured (run 'conductor registry list')." + ), + ) from exc + + if resolved.kind == "file": + # Step 3: file-path syntax but file does not exist — return the + # candidate path so the caller emits a clear "file not found" error. + return candidate + + # Step 4: registry reference — fetch (or return cached) local path. + from conductor.registry.cache import fetch_workflow + + # registry_name, registry_entry, and workflow are always set when kind == "registry" + assert resolved.registry_name is not None # noqa: S101 + assert resolved.registry_entry is not None # noqa: S101 + assert resolved.workflow is not None # noqa: S101 + + try: + return await asyncio.to_thread( + fetch_workflow, + resolved.registry_name, + resolved.registry_entry, + resolved.workflow, + resolved.ref, + ) + except RegistryError as exc: + raise ExecutionError( + f"Failed to fetch registry sub-workflow '{agent_workflow}' " + f"(referenced by agent '{agent_name}'): {exc}", + suggestion=( + "Check that the registry name and workflow name are correct " + "and the registry is reachable." + ), + ) from exc + async def _execute_subworkflow( self, agent: AgentDef, @@ -756,9 +847,9 @@ async def _execute_subworkflow( else: base_dir = Path.cwd() - sub_path = (base_dir / agent.workflow).resolve() + sub_path = await self._resolve_subworkflow_path(agent.workflow, agent.name, base_dir) - if not sub_path.exists(): + if not sub_path.is_file(): raise ExecutionError( f"Sub-workflow file not found: {sub_path} (referenced by agent '{agent.name}')", suggestion="Check that the 'workflow' path is correct and the file exists.", @@ -867,9 +958,9 @@ async def _execute_subworkflow_with_inputs( else: base_dir = Path.cwd() - sub_path = (base_dir / agent.workflow).resolve() + sub_path = await self._resolve_subworkflow_path(agent.workflow, agent.name, base_dir) - if not sub_path.exists(): + if not sub_path.is_file(): raise ExecutionError( f"Sub-workflow file not found: {sub_path} (referenced by agent '{agent.name}')", suggestion="Check that the 'workflow' path is correct and the file exists.", diff --git a/tests/test_config/test_validator.py b/tests/test_config/test_validator.py index 21d8fa8..10297e5 100644 --- a/tests/test_config/test_validator.py +++ b/tests/test_config/test_validator.py @@ -1172,3 +1172,235 @@ def test_extractor_catches_stale_ref_in_input_mapping(self) -> None: agent_refs, input_refs = _extract_template_refs("{{ old_agent.output.findings }}") assert "old_agent" in agent_refs assert not input_refs + + +class TestSubWorkflowRefValidation: + """Tests for _validate_subworkflow_refs in validate_workflow_config.""" + + def _make_config(self, workflow_ref: str) -> WorkflowConfig: + from conductor.config.schema import LimitsConfig, RuntimeConfig + + return WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow=workflow_ref, + routes=[RouteDef(to="$end")], + ), + ], + ) + + def test_local_sub_workflow_validates_ok(self, tmp_path: Path) -> None: + """Local file sub-workflow passes validation when file exists.""" + import textwrap + + from conductor.config.validator import validate_workflow_config + + sub = tmp_path / "sub.yaml" + sub.write_text( + textwrap.dedent("""\ + workflow: + name: sub + entry_point: step + runtime: + provider: copilot + limits: + max_iterations: 10 + agents: + - name: step + type: agent + prompt: go + routes: + - to: "$end" + output: {} + """), + encoding="utf-8", + ) + parent = tmp_path / "parent.yaml" + parent.write_text("dummy", encoding="utf-8") + + config = self._make_config("./sub.yaml") + warnings = validate_workflow_config(config, workflow_path=parent) + assert warnings == [] + + def test_missing_local_sub_workflow_errors(self, tmp_path: Path) -> None: + """Missing local file sub-workflow produces a validation error.""" + from conductor.config.validator import validate_workflow_config + from conductor.exceptions import ConfigurationError + + parent = tmp_path / "parent.yaml" + parent.write_text("dummy", encoding="utf-8") + + config = self._make_config("./nonexistent.yaml") + with pytest.raises(ConfigurationError, match="sub-workflow file not found"): + validate_workflow_config(config, workflow_path=parent) + + def test_malformed_registry_ref_errors(self, tmp_path: Path) -> None: + """Malformed registry reference (two '@') produces a validation error.""" + from conductor.config.validator import validate_workflow_config + from conductor.exceptions import ConfigurationError + + parent = tmp_path / "parent.yaml" + parent.write_text("dummy", encoding="utf-8") + + config = self._make_config("a@b@c") # two '@' — malformed + with pytest.raises(ConfigurationError, match="invalid sub-workflow reference"): + validate_workflow_config(config, workflow_path=parent) + + def test_registry_ref_validates_fetched_workflow(self, tmp_path: Path) -> None: + """Registry reference fetches the workflow and validates it recursively.""" + import textwrap + from unittest.mock import patch + + from conductor.config.validator import validate_workflow_config + from conductor.registry.config import RegistryEntry, RegistryType + from conductor.registry.resolver import ResolvedRef + + cached_sub = tmp_path / "fetched.yaml" + cached_sub.write_text( + textwrap.dedent("""\ + workflow: + name: fetched + entry_point: step + runtime: + provider: copilot + limits: + max_iterations: 10 + agents: + - name: step + type: agent + prompt: go + routes: + - to: "$end" + output: {} + """), + encoding="utf-8", + ) + + parent = tmp_path / "parent.yaml" + parent.write_text("dummy", encoding="utf-8") + + fake_entry = RegistryEntry(type=RegistryType.github, source="https://github.com/x/y") + fake_resolved = ResolvedRef( + kind="registry", + workflow="fetched", + registry_name="team-a", + ref="v1.0.0", + registry_entry=fake_entry, + ) + + config = self._make_config("fetched@team-a#v1.0.0") + with ( + patch("conductor.registry.resolver.resolve_ref", return_value=fake_resolved), + patch("conductor.registry.cache.fetch_workflow", return_value=cached_sub), + ): + warnings = validate_workflow_config(config, workflow_path=parent) + + assert warnings == [] + + def test_registry_fetch_failure_errors(self, tmp_path: Path) -> None: + """Registry fetch failure during validation produces a clear error.""" + from unittest.mock import patch + + from conductor.config.validator import validate_workflow_config + from conductor.exceptions import ConfigurationError + from conductor.registry.config import RegistryEntry, RegistryType + from conductor.registry.errors import RegistryError + from conductor.registry.resolver import ResolvedRef + + parent = tmp_path / "parent.yaml" + parent.write_text("dummy", encoding="utf-8") + + fake_entry = RegistryEntry(type=RegistryType.github, source="https://github.com/x/y") + fake_resolved = ResolvedRef( + kind="registry", + workflow="missing", + registry_name="team-a", + ref="v1.0.0", + registry_entry=fake_entry, + ) + + config = self._make_config("missing@team-a#v1.0.0") + with ( + patch("conductor.registry.resolver.resolve_ref", return_value=fake_resolved), + patch( + "conductor.registry.cache.fetch_workflow", + side_effect=RegistryError("workflow not found"), + ), + pytest.raises(ConfigurationError, match="failed to fetch registry sub-workflow"), + ): + validate_workflow_config(config, workflow_path=parent) + + def test_for_each_workflow_agent_ref_validated(self, tmp_path: Path) -> None: + """Registry ref inside a for_each inline workflow agent is validated.""" + from unittest.mock import patch + + from conductor.config.schema import ForEachDef, LimitsConfig, RuntimeConfig + from conductor.config.validator import validate_workflow_config + from conductor.exceptions import ConfigurationError + from conductor.registry.config import RegistryEntry, RegistryType + from conductor.registry.errors import RegistryError + from conductor.registry.resolver import ResolvedRef + + parent = tmp_path / "parent.yaml" + parent.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="batch", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="loader", + type="agent", + prompt="load items", + routes=[RouteDef(to="batch")], + ), + ], + for_each=[ + ForEachDef( + name="batch", + type="for_each", + source="loader.output.items", + **{"as": "item"}, + agent=AgentDef( + name="worker", + type="workflow", + workflow="missing@team-a#v1.0.0", + routes=[RouteDef(to="$end")], + ), + routes=[RouteDef(to="$end")], + ) + ], + ) + + fake_entry = RegistryEntry(type=RegistryType.github, source="https://github.com/x/y") + fake_resolved = ResolvedRef( + kind="registry", + workflow="missing", + registry_name="team-a", + ref="v1.0.0", + registry_entry=fake_entry, + ) + + with ( + patch("conductor.registry.resolver.resolve_ref", return_value=fake_resolved), + patch( + "conductor.registry.cache.fetch_workflow", + side_effect=RegistryError("workflow not found"), + ), + pytest.raises(ConfigurationError, match="failed to fetch registry sub-workflow"), + ): + validate_workflow_config(config, workflow_path=parent) diff --git a/tests/test_engine/test_subworkflow.py b/tests/test_engine/test_subworkflow.py index aca54e4..7e164e1 100644 --- a/tests/test_engine/test_subworkflow.py +++ b/tests/test_engine/test_subworkflow.py @@ -1468,3 +1468,226 @@ def _handler(agent, prompt, context): ("batch[1]",), ("batch[2]",), } + + +class TestRegistrySubWorkflowResolution: + """Tests for _resolve_subworkflow_path with registry references.""" + + @pytest.mark.asyncio + async def test_registry_ref_resolved_and_executed( + self, tmp_workflow_dir: Path, tmp_path: Path + ) -> None: + """Registry reference fetches workflow and executes it.""" + from unittest.mock import AsyncMock, patch + + # Write a real sub-workflow to a temp cache location + cached_sub = tmp_path / "sub.yaml" + _write_yaml( + cached_sub, + """\ + workflow: + name: sub-from-registry + entry_point: inner + runtime: + provider: copilot + limits: + max_iterations: 10 + agents: + - name: inner + type: agent + prompt: do it + routes: + - to: "$end" + output: + result: "{{ inner.output.result }}" + """, + ) + + parent_path = tmp_workflow_dir / "parent.yaml" + parent_path.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow="analysis@team-a#v1.0.0", + routes=[RouteDef(to="$end")], + ), + ], + output={"result": "{{ sub_wf.output.result }}"}, + ) + + call_count = 0 + + def mock_handler(agent, prompt, context): + nonlocal call_count + call_count += 1 + return {"result": "registry-result"} + + from conductor.providers.copilot import CopilotProvider + + provider = CopilotProvider(mock_handler=mock_handler) + + with patch( + "conductor.engine.workflow.WorkflowEngine._resolve_subworkflow_path", + new_callable=AsyncMock, + return_value=cached_sub, + ): + engine = WorkflowEngine(config, provider, workflow_path=parent_path) + result = await engine.run({}) + + assert result.get("result") == "registry-result" + + @pytest.mark.asyncio + async def test_registry_fetch_failure_raises_execution_error( + self, tmp_workflow_dir: Path + ) -> None: + """Registry fetch failure is wrapped in ExecutionError with agent context.""" + from unittest.mock import patch + + from conductor.exceptions import ExecutionError + from conductor.registry.errors import RegistryError + + parent_path = tmp_workflow_dir / "parent.yaml" + parent_path.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow="missing@unknown-registry#v1.0.0", + routes=[RouteDef(to="$end")], + ), + ], + ) + + mock_provider = MagicMock() + engine = WorkflowEngine(config, mock_provider, workflow_path=parent_path) + + # Patch resolve_ref to return a registry kind, then patch fetch_workflow to fail + from conductor.registry.config import RegistryEntry, RegistryType + from conductor.registry.resolver import ResolvedRef + + fake_entry = RegistryEntry(type=RegistryType.github, source="https://github.com/x/y") + fake_resolved = ResolvedRef( + kind="registry", + workflow="missing", + registry_name="unknown-registry", + ref="v1.0.0", + registry_entry=fake_entry, + ) + + with ( + patch("conductor.registry.resolver.resolve_ref", return_value=fake_resolved), + patch( + "conductor.registry.cache.fetch_workflow", + side_effect=RegistryError("not found"), + ), + pytest.raises(ExecutionError, match="Failed to fetch registry sub-workflow"), + ): + await engine.run({}) + + @pytest.mark.asyncio + async def test_local_file_takes_precedence_over_registry(self, tmp_workflow_dir: Path) -> None: + """An extensionless name that matches a local file is not treated as registry ref.""" + # Create a local file named "analysis" (no extension) beside the parent + analysis_path = tmp_workflow_dir / "analysis" + _write_yaml( + analysis_path, + """\ + workflow: + name: analysis + entry_point: step + runtime: + provider: copilot + limits: + max_iterations: 10 + agents: + - name: step + type: agent + prompt: analyze + routes: + - to: "$end" + output: + result: "{{ step.output.result }}" + """, + ) + + parent_path = tmp_workflow_dir / "parent.yaml" + parent_path.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow="analysis", # extensionless — local file wins + routes=[RouteDef(to="$end")], + ), + ], + output={"result": "{{ sub_wf.output.result }}"}, + ) + + from conductor.providers.copilot import CopilotProvider + + provider = CopilotProvider(mock_handler=lambda agent, prompt, context: {"result": "local"}) + + # No registry mock needed — should resolve purely via local file check + engine = WorkflowEngine(config, provider, workflow_path=parent_path) + result = await engine.run({}) + assert result.get("result") == "local" + + @pytest.mark.asyncio + async def test_malformed_registry_ref_raises_execution_error( + self, tmp_workflow_dir: Path + ) -> None: + """Malformed registry ref raises ExecutionError with helpful message.""" + parent_path = tmp_workflow_dir / "parent.yaml" + parent_path.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow="a@b@c", # two '@' signs — malformed + routes=[RouteDef(to="$end")], + ), + ], + ) + + mock_provider = MagicMock() + engine = WorkflowEngine(config, mock_provider, workflow_path=parent_path) + + with pytest.raises(ExecutionError, match="Failed to resolve sub-workflow"): + await engine.run({}) diff --git a/uv.lock b/uv.lock index abc7aea..b7b4d9b 100644 --- a/uv.lock +++ b/uv.lock @@ -150,7 +150,7 @@ wheels = [ [[package]] name = "conductor-cli" -version = "0.1.14" +version = "0.1.15" source = { editable = "." } dependencies = [ { name = "anthropic" }, From cf10306f35449548d39bdd7889abfe60a8f36157 Mon Sep 17 00:00:00 2001 From: Jason Robert Date: Thu, 14 May 2026 09:42:54 -0400 Subject: [PATCH 2/4] fix(validator): thread cycle-detection state through recursive validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous implementation built a `_visited` set in `_validate_subworkflow_refs` but called `validate_workflow_config` recursively without threading the set through. Each recursive level started with a fresh empty `_visited`, so cycles like A → B → A were not detected and would either recurse to the depth cap silently or loop until an unrelated failure (e.g. fetch error). Adds two internal-only kwargs to `validate_workflow_config` (`_visited_subworkflows`, `_subworkflow_depth`) that are threaded through recursive validation so the visited set accumulates across the full sub-workflow chain. Adds a test exercising A → B → A. Also adds a docstring note to `_resolve_subworkflow_path` explaining that mutable registry refs (e.g. `name@registry#main`, or no `#ref`) may resolve to a different commit on `conductor resume` if the upstream branch has moved. Pinned tags or commit SHAs guarantee deterministic resume. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/config/validator.py | 20 ++++++- src/conductor/engine/workflow.py | 9 ++++ tests/test_config/test_validator.py | 81 +++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 1 deletion(-) diff --git a/src/conductor/config/validator.py b/src/conductor/config/validator.py index 4b92f76..0d8f38e 100644 --- a/src/conductor/config/validator.py +++ b/src/conductor/config/validator.py @@ -88,6 +88,9 @@ def get(self, key: str, default: object = None) -> object: def validate_workflow_config( config: WorkflowConfig, workflow_path: Path | None = None, + *, + _visited_subworkflows: frozenset[str] | None = None, + _subworkflow_depth: int = 0, ) -> list[str]: """Perform comprehensive validation of a workflow configuration. @@ -98,6 +101,12 @@ def validate_workflow_config( Args: config: The WorkflowConfig to validate. workflow_path: Optional path to the workflow file (for !file resolution). + _visited_subworkflows: Internal — set of canonical sub-workflow paths + already on the validation stack, used for cycle detection in + recursive sub-workflow validation. External callers should leave + this as ``None``. + _subworkflow_depth: Internal — current recursion depth for + sub-workflow validation. External callers should leave this as 0. Returns: A list of warning messages (non-fatal issues). @@ -197,7 +206,12 @@ def validate_workflow_config( # Skipped when workflow_path is not provided — relative paths cannot be # resolved without knowing the file's location. if workflow_path is not None: - sub_errors, sub_warnings = _validate_subworkflow_refs(config, workflow_path) + sub_errors, sub_warnings = _validate_subworkflow_refs( + config, + workflow_path, + _visited=_visited_subworkflows, + _depth=_subworkflow_depth, + ) errors.extend(sub_errors) warnings.extend(sub_warnings) @@ -834,9 +848,13 @@ def _validate_subworkflow_refs( continue try: + # Thread _visited and _depth through validate_workflow_config so + # nested sub-workflow validation also gets cycle detection. sub_warnings = validate_workflow_config( sub_config, workflow_path=sub_path, + _visited_subworkflows=_visited | {canonical}, + _subworkflow_depth=_depth + 1, ) warnings.extend(f"{label} → sub-workflow '{sub_path.name}': {w}" for w in sub_warnings) except Exception as exc: diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index 130542f..44994ef 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -727,6 +727,15 @@ async def _resolve_subworkflow_path( 4. For registry refs, fetch the workflow (with caching) and return the cached local path. + Note on checkpoint/resume: this helper is called on every + sub-workflow execution, including after :meth:`resume`. Pinned + registry refs (``name@registry#v1.2.3`` or ``name@registry#``) + always resolve to the same cached path. Mutable refs + (``name@registry#main`` or no ``#ref`` defaulting to "latest") may + resolve to a different commit on resume if the upstream branch has + moved. Use pinned tags or commit SHAs in production workflows when + deterministic resume is required. + Args: agent_workflow: The ``workflow:`` field value from the agent def. agent_name: Name of the containing agent (used in error messages). diff --git a/tests/test_config/test_validator.py b/tests/test_config/test_validator.py index 10297e5..25ecbc4 100644 --- a/tests/test_config/test_validator.py +++ b/tests/test_config/test_validator.py @@ -1404,3 +1404,84 @@ def test_for_each_workflow_agent_ref_validated(self, tmp_path: Path) -> None: pytest.raises(ConfigurationError, match="failed to fetch registry sub-workflow"), ): validate_workflow_config(config, workflow_path=parent) + + def test_circular_subworkflow_ref_detected(self, tmp_path: Path) -> None: + """Circular sub-workflow references (A → B → A) are caught during validation. + + Without cycle detection, recursive validation would loop indefinitely. + With it, validation produces a clear "circular reference" error. + """ + import textwrap + + from conductor.config.schema import LimitsConfig, RuntimeConfig + from conductor.config.validator import validate_workflow_config + from conductor.exceptions import ConfigurationError + + # B references A + b_yaml = tmp_path / "b.yaml" + b_yaml.write_text( + textwrap.dedent("""\ + workflow: + name: b + entry_point: ref_a + runtime: + provider: copilot + limits: + max_iterations: 10 + agents: + - name: ref_a + type: workflow + workflow: ./a.yaml + routes: + - to: "$end" + output: {} + """), + encoding="utf-8", + ) + + # A references B + a_yaml = tmp_path / "a.yaml" + a_yaml.write_text( + textwrap.dedent("""\ + workflow: + name: a + entry_point: ref_b + runtime: + provider: copilot + limits: + max_iterations: 10 + agents: + - name: ref_b + type: workflow + workflow: ./b.yaml + routes: + - to: "$end" + output: {} + """), + encoding="utf-8", + ) + + # Parent references A, which kicks off the A → B → A cycle + parent = tmp_path / "parent.yaml" + parent.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow="./a.yaml", + routes=[RouteDef(to="$end")], + ), + ], + ) + + with pytest.raises(ConfigurationError, match="circular sub-workflow reference"): + validate_workflow_config(config, workflow_path=parent) From 865f08179a31d7e7327b793b663791905cd474d9 Mon Sep 17 00:00:00 2001 From: Jason Robert Date: Thu, 14 May 2026 09:48:54 -0400 Subject: [PATCH 3/4] test(engine): verify resume re-resolves registry sub-workflow refs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a full round-trip test for the documented resume + registry ref behavior: fail mid-sub-workflow → checkpoint → resume → success. The test verifies that: 1. fetch_workflow is called again on resume (resolution is not memoized on the engine instance), and 2. for a stable cached ref, the resolved path is identical between the original run and the resumed run. This is the determinism guarantee for cached/SHA-pinned registry sub-workflow refs across resume. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- tests/test_engine/test_subworkflow.py | 147 ++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) diff --git a/tests/test_engine/test_subworkflow.py b/tests/test_engine/test_subworkflow.py index 7e164e1..2ae717b 100644 --- a/tests/test_engine/test_subworkflow.py +++ b/tests/test_engine/test_subworkflow.py @@ -1691,3 +1691,150 @@ async def test_malformed_registry_ref_raises_execution_error( with pytest.raises(ExecutionError, match="Failed to resolve sub-workflow"): await engine.run({}) + + @pytest.mark.asyncio + async def test_resume_re_resolves_registry_ref_to_same_path( + self, tmp_workflow_dir: Path, tmp_path: Path + ) -> None: + """On resume, a registry sub-workflow ref is re-resolved cleanly. + + Verifies the documented compatibility behavior: ``_resolve_subworkflow_path`` + is called again during ``engine.resume()``, and for a SHA-pinned (or cached) + registry ref, ``fetch_workflow`` returns the same local cached path it + returned on the original run. This is the determinism guarantee for resume. + """ + from unittest.mock import patch + + from conductor.engine.checkpoint import CheckpointManager + from conductor.engine.context import WorkflowContext + from conductor.engine.limits import LimitEnforcer + from conductor.exceptions import ProviderError + from conductor.providers.copilot import CopilotProvider + from conductor.registry.config import RegistryEntry, RegistryType + from conductor.registry.resolver import ResolvedRef + + # Set up a real cached sub-workflow file + cached_sub = tmp_path / "cache" / "team-a" / "analysis" / "abcdef123456" / "analysis.yaml" + cached_sub.parent.mkdir(parents=True) + _write_yaml( + cached_sub, + """\ + workflow: + name: analysis + entry_point: do_work + runtime: + provider: copilot + limits: + max_iterations: 10 + agents: + - name: do_work + type: agent + prompt: analyze + routes: + - to: "$end" + output: + result: "{{ do_work.output.result }}" + """, + ) + + parent_path = tmp_workflow_dir / "parent.yaml" + parent_path.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="planner", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="planner", + type="agent", + prompt="plan", + routes=[RouteDef(to="sub_wf")], + ), + AgentDef( + name="sub_wf", + type="workflow", + workflow="analysis@team-a#v1.0.0", + routes=[RouteDef(to="$end")], + ), + ], + output={"result": "{{ sub_wf.output.result }}"}, + ) + + # Track fetch_workflow calls and the paths returned on each call + fetch_call_paths: list[Path] = [] + + def tracked_fetch(*args, **kwargs): + fetch_call_paths.append(cached_sub) + return cached_sub + + fake_entry = RegistryEntry(type=RegistryType.github, source="https://github.com/x/y") + fake_resolved = ResolvedRef( + kind="registry", + workflow="analysis", + registry_name="team-a", + ref="v1.0.0", + registry_entry=fake_entry, + ) + + # First run: planner succeeds, sub_wf inner agent fails + run_count = {"do_work": 0} + + def failing_handler(agent, prompt, context): + if agent.name == "planner": + return {"plan": "do analysis"} + if agent.name == "do_work": + run_count["do_work"] += 1 + if run_count["do_work"] == 1: + raise ProviderError("transient failure") + return {"result": "analysis-complete"} + return {} + + provider = CopilotProvider(mock_handler=failing_handler) + engine = WorkflowEngine(config, provider, workflow_path=parent_path) + + with ( + patch("conductor.registry.resolver.resolve_ref", return_value=fake_resolved), + patch("conductor.registry.cache.fetch_workflow", side_effect=tracked_fetch), + patch.object(CheckpointManager, "get_checkpoints_dir", return_value=tmp_path), + pytest.raises(ProviderError, match="transient failure"), + ): + await engine.run({}) + + # Sub-workflow was reached (fetch was called) but inner agent failed + assert len(fetch_call_paths) == 1, "fetch_workflow should have been called once" + first_path = fetch_call_paths[0] + + checkpoint_path = engine._last_checkpoint_path + assert checkpoint_path is not None + + # Resume: re-create engine, restore state, run again + cp = CheckpointManager.load_checkpoint(checkpoint_path) + engine2 = WorkflowEngine(config, provider, workflow_path=parent_path) + engine2.set_context(WorkflowContext.from_dict(cp.context)) + engine2.set_limits( + LimitEnforcer.from_dict( + cp.limits, + timeout_seconds=config.workflow.limits.timeout_seconds, + ) + ) + + with ( + patch("conductor.registry.resolver.resolve_ref", return_value=fake_resolved), + patch("conductor.registry.cache.fetch_workflow", side_effect=tracked_fetch), + patch.object(CheckpointManager, "get_checkpoints_dir", return_value=tmp_path), + ): + result = await engine2.resume(cp.current_agent) + + # fetch_workflow was called again on resume — and returned the SAME + # cached path. This is the deterministic-resume guarantee for cached + # registry refs. + assert len(fetch_call_paths) == 2, "fetch_workflow should be called again on resume" + assert fetch_call_paths[1] == first_path, ( + "resume must resolve the registry ref to the same cached path as the original run" + ) + assert result["result"] == "analysis-complete" From b0c63173c0496f5566aec49d9ec63a48acef68bc Mon Sep 17 00:00:00 2001 From: Jason Robert Date: Thu, 14 May 2026 10:06:20 -0400 Subject: [PATCH 4/4] fix: address PR review feedback for registry sub-workflow refs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Code-review findings: * **Cycle detection now uses inode identity** (`(st_dev, st_ino)`) instead of `str(Path.resolve())`. The previous string-based approach still caught cycles eventually but treated case-variants of the same file as distinct entries on case-insensitive filesystems (macOS, Windows). Inode-based identity catches the cycle on the first revisit and correctly handles symlinks too. Adds a test that exercises the case-variant scenario (skipped on case-sensitive FS). * **Depth-limit silent return now emits a warning.** Previously `_validate_subworkflow_refs` returned empty errors/warnings when `_MAX_SUBWORKFLOW_VALIDATION_DEPTH` was hit, so the user couldn't tell whether the workflow was clean or whether validation was truncated. Adds a regression test that builds a chain longer than the limit and asserts the warning surfaces. * **Bare `except Exception` narrowed to `except ConfigurationError`** for the recursive `validate_workflow_config` call (the only exception that recursive call documents raising). The `load_config` catch stays broad — load can fail many ways (YAML parse errors, schema errors, file IO). Test-quality findings: * `test_registry_ref_resolved_and_executed` no longer mocks `_resolve_subworkflow_path` — it now mocks only the registry config loader and `fetch_workflow`, so the engine's real resolution flow runs end-to-end (real `resolve_ref` parses `analysis@team-a#v1.0.0`, real precedence check confirms no local file shadows it, real registry branch is taken). * `test_registry_ref_validates_fetched_workflow` no longer mocks `resolve_ref` — it captures `fetch_workflow` arguments and asserts the parsed workflow name, registry name, and ref are correct, exercising real parser logic. * `test_local_file_takes_precedence_over_registry` now patches `resolve_ref` and asserts it is not called when a local file exists, guaranteeing the precedence short-circuit is preserved. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/config/validator.py | 51 ++++-- tests/test_config/test_validator.py | 229 ++++++++++++++++++++++++-- tests/test_engine/test_subworkflow.py | 49 ++++-- 3 files changed, 290 insertions(+), 39 deletions(-) diff --git a/src/conductor/config/validator.py b/src/conductor/config/validator.py index 0d8f38e..318f720 100644 --- a/src/conductor/config/validator.py +++ b/src/conductor/config/validator.py @@ -89,7 +89,7 @@ def validate_workflow_config( config: WorkflowConfig, workflow_path: Path | None = None, *, - _visited_subworkflows: frozenset[str] | None = None, + _visited_subworkflows: frozenset[tuple[int, int]] | None = None, _subworkflow_depth: int = 0, ) -> list[str]: """Perform comprehensive validation of a workflow configuration. @@ -101,10 +101,10 @@ def validate_workflow_config( Args: config: The WorkflowConfig to validate. workflow_path: Optional path to the workflow file (for !file resolution). - _visited_subworkflows: Internal — set of canonical sub-workflow paths - already on the validation stack, used for cycle detection in - recursive sub-workflow validation. External callers should leave - this as ``None``. + _visited_subworkflows: Internal — set of canonical (st_dev, st_ino) + tuples for sub-workflow files already on the validation stack, + used for cycle detection in recursive sub-workflow validation. + External callers should leave this as ``None``. _subworkflow_depth: Internal — current recursion depth for sub-workflow validation. External callers should leave this as 0. @@ -778,23 +778,29 @@ def _collect_template_strings( def _validate_subworkflow_refs( config: WorkflowConfig, workflow_path: Path | None, - _visited: frozenset[str] | None = None, + _visited: frozenset[tuple[int, int]] | None = None, _depth: int = 0, ) -> tuple[list[str], list[str]]: """Validate all ``type: workflow`` agent references in *config*. For local paths, checks that the file exists. For registry references, fetches the workflow to the local cache and recursively validates the - full composition tree. Cycle detection prevents infinite recursion. + full composition tree. Cycle detection uses inode identity so that the + same file referenced via different cases (on case-insensitive + filesystems like macOS/Windows) or via symlinks resolves to the same + canonical key. Args: config: The workflow configuration to validate. workflow_path: Path of the workflow file being validated (used as the base directory for relative sub-workflow paths). - _visited: Set of already-visited canonical paths (for cycle detection). - Callers should leave this as ``None``; it is threaded through - recursive calls. - _depth: Current recursion depth (internal). + _visited: Set of already-visited canonical (st_dev, st_ino) tuples + for cycle detection. Callers should leave this as ``None``; it is + threaded through recursive calls. + _depth: Current recursion depth (internal). When the depth reaches + :data:`_MAX_SUBWORKFLOW_VALIDATION_DEPTH`, recursion stops and a + warning is emitted so callers know the validation tree was + truncated. Returns: Tuple of (error messages, warning messages). @@ -806,6 +812,12 @@ def _validate_subworkflow_refs( warnings: list[str] = [] if _depth >= _MAX_SUBWORKFLOW_VALIDATION_DEPTH: + warnings.append( + f"Sub-workflow validation depth limit " + f"({_MAX_SUBWORKFLOW_VALIDATION_DEPTH}) reached; " + "deeper sub-workflows were not validated. " + "Reduce nesting or check for unintended cycles." + ) return errors, warnings base_dir = workflow_path.resolve().parent if workflow_path is not None else Path.cwd() @@ -830,7 +842,19 @@ def _validate_subworkflow_refs( if sub_path is None: continue - canonical = str(sub_path) + # Use inode identity (st_dev, st_ino) for cycle detection so that the + # same file referenced via different cases (case-insensitive + # filesystems) or different relative paths resolves to one key. + try: + stat = sub_path.stat() + canonical: tuple[int, int] = (stat.st_dev, stat.st_ino) + except OSError as exc: + # Should be rare since _resolve_subworkflow_ref_for_validation + # already returned a path it considered valid, but stat() can + # still fail on some platforms (e.g. permission errors). + errors.append(f"{label}: cannot stat sub-workflow file '{sub_path}': {exc}") + continue + if canonical in _visited: errors.append( f"{label}: circular sub-workflow reference detected " @@ -857,8 +881,7 @@ def _validate_subworkflow_refs( _subworkflow_depth=_depth + 1, ) warnings.extend(f"{label} → sub-workflow '{sub_path.name}': {w}" for w in sub_warnings) - except Exception as exc: - # validate_workflow_config raises ConfigurationError on failure. + except ConfigurationError as exc: errors.append(f"{label}: sub-workflow '{sub_path.name}' failed validation: {exc}") return errors, warnings diff --git a/tests/test_config/test_validator.py b/tests/test_config/test_validator.py index 25ecbc4..805fc66 100644 --- a/tests/test_config/test_validator.py +++ b/tests/test_config/test_validator.py @@ -1256,13 +1256,18 @@ def test_malformed_registry_ref_errors(self, tmp_path: Path) -> None: validate_workflow_config(config, workflow_path=parent) def test_registry_ref_validates_fetched_workflow(self, tmp_path: Path) -> None: - """Registry reference fetches the workflow and validates it recursively.""" + """Registry reference fetches the workflow and validates it recursively. + + Mocks only the ``RegistriesConfig`` loader and ``fetch_workflow`` so that + real ``resolve_ref`` parses ``fetched@team-a#v1.0.0`` end-to-end — + verifying that workflow name, registry name, and ref all extract + correctly. + """ import textwrap from unittest.mock import patch from conductor.config.validator import validate_workflow_config - from conductor.registry.config import RegistryEntry, RegistryType - from conductor.registry.resolver import ResolvedRef + from conductor.registry.config import RegistriesConfig, RegistryEntry, RegistryType cached_sub = tmp_path / "fetched.yaml" cached_sub.write_text( @@ -1288,23 +1293,39 @@ def test_registry_ref_validates_fetched_workflow(self, tmp_path: Path) -> None: parent = tmp_path / "parent.yaml" parent.write_text("dummy", encoding="utf-8") - fake_entry = RegistryEntry(type=RegistryType.github, source="https://github.com/x/y") - fake_resolved = ResolvedRef( - kind="registry", - workflow="fetched", - registry_name="team-a", - ref="v1.0.0", - registry_entry=fake_entry, + # Real registry config so resolve_ref can find "team-a" + registry_config = RegistriesConfig( + registries={ + "team-a": RegistryEntry( + type=RegistryType.github, + source="https://github.com/example/team-a", + ), + }, ) + # Capture fetch_workflow args to verify resolve_ref produced the + # right registry name, workflow name, and ref. + captured_args: dict[str, object] = {} + + def capture_fetch(registry_name, registry_entry, workflow_name, ref): + captured_args["registry_name"] = registry_name + captured_args["workflow_name"] = workflow_name + captured_args["ref"] = ref + return cached_sub + config = self._make_config("fetched@team-a#v1.0.0") with ( - patch("conductor.registry.resolver.resolve_ref", return_value=fake_resolved), - patch("conductor.registry.cache.fetch_workflow", return_value=cached_sub), + patch("conductor.registry.resolver.load_config", return_value=registry_config), + patch("conductor.registry.cache.fetch_workflow", side_effect=capture_fetch), ): warnings = validate_workflow_config(config, workflow_path=parent) assert warnings == [] + assert captured_args == { + "registry_name": "team-a", + "workflow_name": "fetched", + "ref": "v1.0.0", + } def test_registry_fetch_failure_errors(self, tmp_path: Path) -> None: """Registry fetch failure during validation produces a clear error.""" @@ -1485,3 +1506,187 @@ def test_circular_subworkflow_ref_detected(self, tmp_path: Path) -> None: with pytest.raises(ConfigurationError, match="circular sub-workflow reference"): validate_workflow_config(config, workflow_path=parent) + + def test_circular_subworkflow_via_case_variant_path(self, tmp_path: Path) -> None: + """Cycle is detected even when references use different case variants. + + On case-insensitive filesystems (macOS, Windows) ``A.yaml`` and + ``a.yaml`` are the same file but ``Path.resolve()`` returns different + strings. The validator uses inode identity ``(st_dev, st_ino)`` for + cycle detection so cases like ``A.yaml → B.yaml → a.yaml`` are caught + on the first revisit, regardless of case used in references. + + Skipped on case-sensitive filesystems where the case-variant + references are genuinely different files. + """ + import textwrap + + from conductor.config.schema import LimitsConfig, RuntimeConfig + from conductor.config.validator import validate_workflow_config + from conductor.exceptions import ConfigurationError + + # Detect case-insensitivity by creating a file and checking if its + # uppercase variant is found. + probe = tmp_path / "_case_probe.yaml" + probe.write_text("x", encoding="utf-8") + case_insensitive = (tmp_path / "_CASE_PROBE.yaml").exists() + probe.unlink() + + if not case_insensitive: + pytest.skip("case-insensitive filesystem required (e.g. macOS, Windows)") + + # Write A.yaml that references B.YAML (uppercase) + a_yaml = tmp_path / "A.yaml" + a_yaml.write_text( + textwrap.dedent("""\ + workflow: + name: a + entry_point: ref_b + runtime: + provider: copilot + limits: + max_iterations: 10 + agents: + - name: ref_b + type: workflow + workflow: B.YAML + routes: + - to: "$end" + output: {} + """), + encoding="utf-8", + ) + # Write B.yaml that references a.yaml (lowercase) — same file as A.yaml + # on a case-insensitive FS, completing the cycle + b_yaml = tmp_path / "B.yaml" + b_yaml.write_text( + textwrap.dedent("""\ + workflow: + name: b + entry_point: ref_a + runtime: + provider: copilot + limits: + max_iterations: 10 + agents: + - name: ref_a + type: workflow + workflow: a.yaml + routes: + - to: "$end" + output: {} + """), + encoding="utf-8", + ) + + parent = tmp_path / "parent.yaml" + parent.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow="A.yaml", # same file as a.yaml on case-insensitive FS + routes=[RouteDef(to="$end")], + ), + ], + ) + + # Without inode-based detection, str(Path.resolve()) differs for + # A.yaml vs a.yaml on macOS, so the cycle would still be caught + # (just one level later, after both case variants are visited). + # Inode-based detection catches it on the first revisit and also + # correctly handles symlinks. Either way, the user sees a clear + # circular-reference error rather than hitting the depth limit. + with pytest.raises(ConfigurationError, match="circular sub-workflow reference"): + validate_workflow_config(config, workflow_path=parent) + + def test_validation_depth_limit_emits_warning(self, tmp_path: Path) -> None: + """Hitting the recursion depth limit emits a warning, not a silent pass. + + Builds a 12-level deep chain (parent → a0 → a1 → ... → a11) and + verifies the validator stops at depth ``_MAX_SUBWORKFLOW_VALIDATION_DEPTH`` + but emits a warning so the user knows validation was truncated. + """ + import textwrap + + from conductor.config.schema import LimitsConfig, RuntimeConfig + from conductor.config.validator import ( + _MAX_SUBWORKFLOW_VALIDATION_DEPTH, + validate_workflow_config, + ) + + # Build a deep linear chain a0 → a1 → a2 ... → a{N+1} + depth = _MAX_SUBWORKFLOW_VALIDATION_DEPTH + 2 + for i in range(depth): + next_ref = f"./a{i + 1}.yaml" if i + 1 < depth else "$end" + if next_ref == "$end": + # Terminal sub-workflow: just a single agent + content = textwrap.dedent(f"""\ + workflow: + name: a{i} + entry_point: terminal + runtime: + provider: copilot + limits: + max_iterations: 10 + agents: + - name: terminal + type: agent + prompt: done + routes: + - to: "$end" + output: {{}} + """) + else: + content = textwrap.dedent(f"""\ + workflow: + name: a{i} + entry_point: nested + runtime: + provider: copilot + limits: + max_iterations: 10 + agents: + - name: nested + type: workflow + workflow: {next_ref} + routes: + - to: "$end" + output: {{}} + """) + (tmp_path / f"a{i}.yaml").write_text(content, encoding="utf-8") + + parent = tmp_path / "parent.yaml" + parent.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow="./a0.yaml", + routes=[RouteDef(to="$end")], + ), + ], + ) + + warnings = validate_workflow_config(config, workflow_path=parent) + assert any("depth limit" in w for w in warnings), ( + f"Expected a depth-limit warning, got warnings: {warnings}" + ) diff --git a/tests/test_engine/test_subworkflow.py b/tests/test_engine/test_subworkflow.py index 2ae717b..b192de4 100644 --- a/tests/test_engine/test_subworkflow.py +++ b/tests/test_engine/test_subworkflow.py @@ -1477,10 +1477,16 @@ class TestRegistrySubWorkflowResolution: async def test_registry_ref_resolved_and_executed( self, tmp_workflow_dir: Path, tmp_path: Path ) -> None: - """Registry reference fetches workflow and executes it.""" - from unittest.mock import AsyncMock, patch + """Registry reference fetches workflow and executes it. - # Write a real sub-workflow to a temp cache location + Mocks only ``fetch_workflow`` so that the engine's real + ``_resolve_subworkflow_path`` runs end-to-end: real ``resolve_ref`` + parses the ``analysis@team-a#v1.0.0`` syntax, real precedence check + confirms no local file shadows it, and the registry branch is taken. + """ + from unittest.mock import patch + + # Write a real cached sub-workflow to a temp location cached_sub = tmp_path / "sub.yaml" _write_yaml( cached_sub, @@ -1525,21 +1531,32 @@ async def test_registry_ref_resolved_and_executed( output={"result": "{{ sub_wf.output.result }}"}, ) - call_count = 0 + # Set up a real registry config so resolve_ref can find "team-a" + from conductor.registry.config import RegistriesConfig, RegistryEntry, RegistryType + + registry_config = RegistriesConfig( + registries={ + "team-a": RegistryEntry( + type=RegistryType.github, + source="https://github.com/example/team-a", + ), + }, + ) def mock_handler(agent, prompt, context): - nonlocal call_count - call_count += 1 return {"result": "registry-result"} from conductor.providers.copilot import CopilotProvider provider = CopilotProvider(mock_handler=mock_handler) - with patch( - "conductor.engine.workflow.WorkflowEngine._resolve_subworkflow_path", - new_callable=AsyncMock, - return_value=cached_sub, + # Patch the registry config loader (used by resolve_ref) and + # fetch_workflow (the network boundary). Real resolve_ref parses + # the ref string and looks up the registry; real + # _resolve_subworkflow_path is exercised end-to-end. + with ( + patch("conductor.registry.resolver.load_config", return_value=registry_config), + patch("conductor.registry.cache.fetch_workflow", return_value=cached_sub), ): engine = WorkflowEngine(config, provider, workflow_path=parent_path) result = await engine.run({}) @@ -1655,10 +1672,16 @@ async def test_local_file_takes_precedence_over_registry(self, tmp_workflow_dir: provider = CopilotProvider(mock_handler=lambda agent, prompt, context: {"result": "local"}) - # No registry mock needed — should resolve purely via local file check - engine = WorkflowEngine(config, provider, workflow_path=parent_path) - result = await engine.run({}) + # Patch resolve_ref to verify it is NOT called when a local file + # exists — the precedence check must short-circuit before parsing. + from unittest.mock import patch + + with patch("conductor.registry.resolver.resolve_ref") as mock_resolve_ref: + engine = WorkflowEngine(config, provider, workflow_path=parent_path) + result = await engine.run({}) + assert result.get("result") == "local" + mock_resolve_ref.assert_not_called() @pytest.mark.asyncio async def test_malformed_registry_ref_raises_execution_error(