Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions docs/design/registry.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,119 @@ file of the same name.
| SemVer ranges | Out of scope v1 | Adds resolver complexity for marginal benefit until ecosystems exist. |
| Index format | YAML primary, JSON fallback | Consistent with workflow files. JSON tolerated for tooling. |

## Ad-hoc references

A **workflow reference without pre-configured registry** allows teams to compose
workflows across GitHub organizations and repositories without registry setup.

### Motivation

Configured registries are ideal for standard repos (org-wide workflows, team
templates). But ad-hoc cross-team composition is common: Team C wants to run a
workflow from Team A's repo in combination with Team B's workflow, without any
team having to register each other's repos. Ad-hoc references lower friction for
one-off usage.

### Syntax

```
workflow@owner/repo[#ref]
```

If the part after `@` contains `/`, it is treated as a literal `owner/repo`
GitHub reference (ad-hoc) and fetched directly. Otherwise it is looked up as a
configured registry name (existing behavior).

Examples:

```
analysis@myorg/team-a # default branch HEAD of myorg/team-a
analysis@myorg/team-a#v1.0.0 # tag v1.0.0 of myorg/team-a
analysis@myorg/team-a#main # main branch of myorg/team-a
analysis@myorg/team-a#abc1234 # specific commit SHA
```

### Disambiguation rule

At parse time, Conductor disambiguates between ad-hoc and registry references:

- `analysis@team` → registry name `team` (no `/` in the part after `@`)
- `analysis@myorg/team-a` → ad-hoc reference to `myorg/team-a` (contains `/`)

Registry names are configured by the user and cannot contain `/`, so there is
no ambiguity. Both forms coexist: configured registries are recommended for
frequently-used sources, ad-hoc references for occasional cross-team pulls.

### Caching

Ad-hoc workflows are cached at:

```
~/.conductor/cache/registries/_adhoc/<owner>/<repo>/<workflow>/<sha[:12]>/
```

This isolates ad-hoc caches from named registries, avoiding collisions when the
same workflow name exists in different sources.

### Reference resolution

Ad-hoc references follow the same resolution rules as registry references:

- Missing `#<ref>` → use the **default branch HEAD** (re-resolved on each fetch).
- Explicit `#<tag>` or `#<branch>` → pinned to that tag or the current HEAD of the branch.
- Explicit `#<sha>` → pinned to an exact commit.
- Multiple `@` or multiple `#` are hard errors.

### Authentication

Ad-hoc references use the same authentication as named GitHub registries:
- Public repos work automatically.
- Private repos use `gh auth token` if available, otherwise fail with a clear error.

### Usage

Ad-hoc references work everywhere registry references work:

```bash
conductor run 'analysis@myorg/team-a#v1.0.0' --input question="..."
conductor validate 'analysis@myorg/team-a#main'
conductor resume 'analysis@myorg/team-a#v1.0.0'
```

As a sub-workflow (see [Sub-workflows](#sub-workflows) in the workflow syntax guide):

```yaml
agents:
- name: team_a_analysis
type: workflow
workflow: analysis@myorg/team-a#v1.0.0
input_mapping:
data: "{{ workflow.input.raw_data }}"
```

### Example: Cross-team composition

Team C's workflow references Team A's and Team B's workflows without any
pre-registry setup:

```yaml
agents:
- name: team_a_pipeline
type: workflow
workflow: qa-bot@teamA/qa-workflows#main
input_mapping:
question: "{{ workflow.input.query }}"

- name: team_b_pipeline
type: workflow
workflow: reviewer@teamB/review-workflows#v2.1.0
input_mapping:
content: "{{ team_a_pipeline.output.answer }}"
```

Both workflows are fetched and composed in Team C's workflow without any
registry configuration.

## Open questions

- **Sibling fetch scope for GitHub.** Should we fetch only files in the
Expand Down
32 changes: 31 additions & 1 deletion docs/workflow-syntax.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,10 @@ agents:

**Key semantics:**

- The `workflow` path is resolved relative to the parent workflow file
- The `workflow` field can be:
- A local file path: `./research-pipeline.yaml` (resolved relative to the parent)
- A configured registry reference: `qa-bot@team#v1.2.3` (see [Workflow Registry](design/registry.md))
- An ad-hoc GitHub reference: `analysis@myorg/team-a#main` (owner/repo fetched directly from GitHub)
- Sub-workflow inherits the parent's provider configuration
- Sub-workflow output is stored in context and accessible via `{{ agent_name.output.field }}`
- Recursive composition is supported (sub-workflows can reference other sub-workflows) with a global depth limit of `MAX_SUBWORKFLOW_DEPTH = 10`
Expand All @@ -288,6 +291,33 @@ prompt: |
{{ deep_research.output.findings }}
```

**Workflow reference types** — the `workflow` field supports three forms:

```yaml
agents:
# Local file path (relative to parent workflow)
- name: local_pipeline
type: workflow
workflow: ./shared/research-pipeline.yaml

# Configured registry reference
- name: registry_pipeline
type: workflow
workflow: qa-bot@team#v1.2.3

# Ad-hoc GitHub reference (no registry setup required)
- name: adhoc_pipeline
type: workflow
workflow: analysis@myorg/team-a#main
input_mapping:
data: "{{ workflow.input.raw_data }}"
```

The ad-hoc form (`workflow@owner/repo[#ref]`) allows cross-team workflow
composition without pre-configuring registries. See
[Ad-hoc References](design/registry.md#ad-hoc-references) in the registry design
doc for details on caching, authentication, and ref resolution.

**Sub-workflows in `for_each` groups** — `type: workflow` agents can be used inside `for_each` groups to fan out one sub-workflow run per item in the source array. Each iteration receives its own `input_mapping` evaluated against the loop variable, and emits its own `subworkflow_started` / `subworkflow_completed` events:

```yaml
Expand Down
58 changes: 8 additions & 50 deletions src/conductor/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,25 +359,12 @@ def run(
import asyncio
import json

from conductor.registry.cache import fetch_workflow as fetch_registry_workflow
from conductor.registry.cache import resolve_and_fetch
from conductor.registry.errors import RegistryError
from conductor.registry.resolver import resolve_ref

try:
ref = resolve_ref(workflow)
if ref.kind == "file":
assert ref.path is not None
workflow_path = ref.path
else:
assert ref.registry_name is not None
assert ref.registry_entry is not None
assert ref.workflow is not None
workflow_path = fetch_registry_workflow(
registry_name=ref.registry_name,
registry_entry=ref.registry_entry,
workflow_name=ref.workflow,
ref=ref.ref,
)
workflow_path = resolve_and_fetch(resolve_ref(workflow))
except RegistryError as e:
print_error(e)
raise typer.Exit(code=1) from None
Expand Down Expand Up @@ -507,25 +494,12 @@ def validate(
conductor validate ./examples/my-workflow.yaml
conductor validate qa-bot@team@1.0.0
"""
from conductor.registry.cache import fetch_workflow as fetch_registry_workflow
from conductor.registry.cache import resolve_and_fetch
from conductor.registry.errors import RegistryError
from conductor.registry.resolver import resolve_ref

try:
ref = resolve_ref(workflow)
if ref.kind == "file":
assert ref.path is not None
workflow_path = ref.path
else:
assert ref.registry_name is not None
assert ref.registry_entry is not None
assert ref.workflow is not None
workflow_path = fetch_registry_workflow(
registry_name=ref.registry_name,
registry_entry=ref.registry_entry,
workflow_name=ref.workflow,
ref=ref.ref,
)
workflow_path = resolve_and_fetch(resolve_ref(workflow))
except RegistryError as e:
print_error(e)
raise typer.Exit(code=1) from None
Expand Down Expand Up @@ -563,7 +537,7 @@ def show(
conductor show qa-bot
conductor show qa-bot@my-registry@1.0.0
"""
from conductor.registry.cache import fetch_workflow as fetch_registry_workflow
from conductor.registry.cache import resolve_and_fetch
from conductor.registry.errors import RegistryError
from conductor.registry.resolver import resolve_ref

Expand All @@ -576,15 +550,7 @@ def show(
console.print(f"[bold red]Error:[/bold red] Workflow file not found: {workflow}")
raise typer.Exit(code=1)
else:
assert ref.registry_name is not None
assert ref.registry_entry is not None
assert ref.workflow is not None
workflow_path = fetch_registry_workflow(
registry_name=ref.registry_name,
registry_entry=ref.registry_entry,
workflow_name=ref.workflow,
ref=ref.ref,
)
workflow_path = resolve_and_fetch(ref)
except RegistryError as e:
print_error(e)
raise typer.Exit(code=1) from None
Expand Down Expand Up @@ -819,7 +785,7 @@ def resume(
# Resolve workflow ref if provided
resolved_workflow: Path | None = None
if workflow is not None:
from conductor.registry.cache import fetch_workflow as fetch_registry_workflow
from conductor.registry.cache import resolve_and_fetch
from conductor.registry.errors import RegistryError
from conductor.registry.resolver import resolve_ref

Expand All @@ -834,15 +800,7 @@ def resume(
)
raise typer.Exit(code=1)
else:
assert ref.registry_name is not None
assert ref.registry_entry is not None
assert ref.workflow is not None
resolved_workflow = fetch_registry_workflow(
registry_name=ref.registry_name,
registry_entry=ref.registry_entry,
workflow_name=ref.workflow,
ref=ref.ref,
)
resolved_workflow = resolve_and_fetch(ref)
except RegistryError as e:
print_error(e)
raise typer.Exit(code=1) from None
Expand Down
20 changes: 5 additions & 15 deletions src/conductor/config/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,7 @@ def _resolve_subworkflow_ref_for_validation(
Returns:
Tuple of (resolved path or None on error, list of error strings).
"""
from conductor.registry.cache import resolve_and_fetch
from conductor.registry.errors import RegistryError
from conductor.registry.resolver import resolve_ref

Expand All @@ -926,23 +927,12 @@ def _resolve_subworkflow_ref_for_validation(
errors.append(f"{label}: sub-workflow file not found: '{candidate}'")
return None, errors

# Registry reference: fetch (uses cache; makes network request on first access).
from conductor.registry.cache import fetch_workflow

# registry_name, registry_entry, and workflow are always set when kind == "registry"
assert resolved.registry_name is not None # noqa: S101
assert resolved.registry_entry is not None # noqa: S101
assert resolved.workflow is not None # noqa: S101

# Named registry or ad-hoc reference: fetch (uses cache; makes network
# request on first access).
try:
sub_path = fetch_workflow(
resolved.registry_name,
resolved.registry_entry,
resolved.workflow,
resolved.ref,
)
sub_path = resolve_and_fetch(resolved)
except RegistryError as exc:
errors.append(f"{label}: failed to fetch registry sub-workflow '{workflow_ref}': {exc}")
errors.append(f"{label}: failed to fetch sub-workflow '{workflow_ref}': {exc}")
return None, errors

return sub_path, errors
Expand Down
31 changes: 10 additions & 21 deletions src/conductor/engine/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ async def _resolve_subworkflow_path(
ExecutionError: If the registry reference is malformed, names an
unknown registry, or the registry fetch fails.
"""
from conductor.registry.cache import resolve_and_fetch
from conductor.registry.errors import RegistryError
from conductor.registry.resolver import resolve_ref

Expand All @@ -759,16 +760,17 @@ async def _resolve_subworkflow_path(
if candidate.is_file():
return candidate

# Step 2: heuristic parse file-looking path or registry reference?
# Step 2: parse as file-path / named-registry / ad-hoc reference.
try:
resolved = resolve_ref(agent_workflow)
except RegistryError as exc:
raise ExecutionError(
f"Failed to resolve sub-workflow '{agent_workflow}' "
f"(referenced by agent '{agent_name}'): {exc}",
suggestion=(
"Check the registry reference syntax and ensure the registry "
"is configured (run 'conductor registry list')."
"Check the registry reference syntax. For named registries, "
"ensure the registry is configured (run 'conductor registry list'). "
"For ad-hoc references, use 'workflow@owner/repo[#ref]'."
),
) from exc

Expand All @@ -777,29 +779,16 @@ async def _resolve_subworkflow_path(
# candidate path so the caller emits a clear "file not found" error.
return candidate

# Step 4: registry reference — fetch (or return cached) local path.
from conductor.registry.cache import fetch_workflow

# registry_name, registry_entry, and workflow are always set when kind == "registry"
assert resolved.registry_name is not None # noqa: S101
assert resolved.registry_entry is not None # noqa: S101
assert resolved.workflow is not None # noqa: S101

# Step 4: dispatch to the unified fetcher (handles registry + adhoc).
try:
return await asyncio.to_thread(
fetch_workflow,
resolved.registry_name,
resolved.registry_entry,
resolved.workflow,
resolved.ref,
)
return await asyncio.to_thread(resolve_and_fetch, resolved)
except RegistryError as exc:
raise ExecutionError(
f"Failed to fetch registry sub-workflow '{agent_workflow}' "
f"Failed to fetch sub-workflow '{agent_workflow}' "
f"(referenced by agent '{agent_name}'): {exc}",
suggestion=(
"Check that the registry name and workflow name are correct "
"and the registry is reachable."
"Check that the registry/repo and workflow name are correct "
"and the source is reachable."
),
) from exc

Expand Down
Loading
Loading