Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ make validate-examples # validate all examples
- `bg_runner.py` - Background process forking for `--web-bg` mode
- `pid.py` - PID file utilities for tracking/stopping background processes
- `update.py` - Update check, version comparison, and self-upgrade via `uv tool install`
- `pricing.py` - `conductor pricing path` subcommand: print where conductor reads user-level pricing from

- **config/**: YAML loading and Pydantic schema validation
- `schema.py` - Pydantic models for all workflow YAML structures (WorkflowConfig, AgentDef, ParallelGroup, ForEachDef, etc.)
- `loader.py` - YAML parsing with environment variable resolution (${VAR:-default}) and `!file` tag support
- `validator.py` - Cross-reference validation (agent names, routes, parallel groups)
- `user_pricing.py` - Loads optional machine-wide pricing overrides from `~/.conductor/pricing.yaml` (path overridable via `CONDUCTOR_PRICING_FILE`); missing file is silent, malformed file is a hard error

- **engine/**: Workflow execution orchestration
- `workflow.py` - Main `WorkflowEngine` class that orchestrates agent execution, parallel groups, for-each groups, and routing
Expand Down
56 changes: 56 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,62 @@ conductor validate <workflow.yaml>

**Full CLI documentation:** [docs/cli-reference.md](docs/cli-reference.md)

## Cost Tracking

Conductor estimates per-agent and per-workflow costs from token usage and a
built-in pricing table (`src/conductor/engine/pricing.py`). When a model is
absent from the table, that agent's cost rolls up as `$0` rather than failing —
so for non-public, preview, or newly-released models you'll want to supply
pricing yourself.

Pricing resolves in this order, highest precedence first:

1. **Exact** match in workflow `runtime.cost.pricing`
2. **Exact** match in user `~/.conductor/pricing.yaml`
3. **Exact** match in built-in `DEFAULT_PRICING`
4. **Fuzzy** (versioned-suffix) match against built-in `DEFAULT_PRICING`

User and workflow overrides are exact-match only. List each concrete model
name you want to override; family-name entries do not auto-cover descendants.

### Per-workflow overrides

```yaml
workflow:
cost:
show_summary: true
pricing:
claude-opus-4.7-high:
input_per_mtok: 15.00
output_per_mtok: 75.00
cache_read_per_mtok: 1.50
cache_write_per_mtok: 18.75
```

### Machine-wide overrides

For pricing you want to apply to every workflow on a machine — typical for
preview models without published rates — drop a `~/.conductor/pricing.yaml`:

```yaml
pricing:
claude-opus-4.7-high:
input_per_mtok: 15.00
output_per_mtok: 75.00
gpt-5.4:
input_per_mtok: 2.00
output_per_mtok: 8.00
```

* Missing file is silently OK.
* Malformed file is a hard error with a pointer to the path; bypass a broken
file by pointing `CONDUCTOR_PRICING_FILE` at a path that doesn't exist.
* Workflow entries always win for the same model name.
* Override entries are exact-match only — list each concrete model name
(e.g. `claude-opus-4-20250514`, not just `claude-opus-4`).

Run `conductor pricing path` to print the resolved file location.

## Workflow Registries

Conductor supports named workflow registries — GitHub repos or local directories
Expand Down
2 changes: 2 additions & 0 deletions src/conductor/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ class ConsoleVerbosity(str, Enum):
)

# Register subcommand groups
from conductor.cli.pricing import pricing_app # noqa: E402
from conductor.cli.registry import registry_app # noqa: E402

app.add_typer(registry_app)
app.add_typer(pricing_app)

# Rich console for formatted output
console = Console(stderr=True)
Expand Down
28 changes: 28 additions & 0 deletions src/conductor/cli/pricing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Typer subcommand group for inspecting pricing overrides."""

from __future__ import annotations

import typer

from conductor.config.user_pricing import USER_PRICING_ENV_VAR, get_user_pricing_path

pricing_app = typer.Typer(
name="pricing",
help="Inspect machine-wide pricing overrides.",
no_args_is_help=True,
)


@pricing_app.command("path")
def path() -> None:
"""Print the path conductor would read for user-level pricing.

Honors the ``CONDUCTOR_PRICING_FILE`` environment variable.
"""
target = get_user_pricing_path()
typer.echo(str(target))
if not target.exists():
typer.echo(
f"(File does not exist; create it or set {USER_PRICING_ENV_VAR}.)",
err=True,
)
112 changes: 112 additions & 0 deletions src/conductor/config/user_pricing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""User-level (machine-wide) pricing overrides.

Loads optional pricing overrides from ``~/.conductor/pricing.yaml`` (or the
path in ``CONDUCTOR_PRICING_FILE``, with ``~`` expansion). A missing file
returns an empty mapping; a malformed file raises ``ConfigurationError``
with a pointer to the path — silent acceptance of corrupted overrides
would re-introduce the "your costs are wrong and you don't know" bug this
exists to solve.
"""

from __future__ import annotations

import logging
import os
from pathlib import Path

from pydantic import ValidationError
from ruamel.yaml import YAML
from ruamel.yaml.error import YAMLError

from conductor.config.schema import PricingOverride
from conductor.engine.pricing import ModelPricing
from conductor.exceptions import ConfigurationError

logger = logging.getLogger(__name__)

USER_PRICING_ENV_VAR = "CONDUCTOR_PRICING_FILE"


def get_user_pricing_path() -> Path:
"""Return the path conductor would read for user-level pricing.

Honors ``CONDUCTOR_PRICING_FILE`` (with ``~`` expansion) when set;
otherwise returns ``~/.conductor/pricing.yaml``.
"""
raw = os.environ.get(USER_PRICING_ENV_VAR)
if raw:
return Path(os.path.expanduser(raw))
return Path.home() / ".conductor" / "pricing.yaml"


def load_user_pricing(path: Path | None = None) -> dict[str, ModelPricing]:
"""Load machine-wide pricing overrides from the user file.

Returns an empty mapping when the file does not exist. Raises
``ConfigurationError`` when the file exists but is unreadable, invalid
YAML, or fails ``PricingOverride`` schema validation.
"""
target = path if path is not None else get_user_pricing_path()

if not target.exists():
return {}

try:
data = YAML(typ="safe").load(target.read_text(encoding="utf-8"))
except (OSError, YAMLError) as e:
raise ConfigurationError(
f"Failed to load user pricing file '{target}': {e}",
suggestion=(
"Fix the file, delete it, or temporarily bypass it by setting "
"CONDUCTOR_PRICING_FILE to a path that does not exist "
"(e.g. CONDUCTOR_PRICING_FILE=/dev/null)."
),
file_path=str(target),
) from e

if data is None:
return {}

if not isinstance(data, dict) or "pricing" not in data:
raise ConfigurationError(
f"User pricing file '{target}' must contain a top-level `pricing:` mapping.",
file_path=str(target),
)

raw_entries = data["pricing"]
if raw_entries is None:
return {}
if not isinstance(raw_entries, dict):
raise ConfigurationError(
f"User pricing file '{target}' has a `pricing:` value of type "
f"{type(raw_entries).__name__}; expected a mapping of model name to "
f"pricing override.",
file_path=str(target),
)

overrides: dict[str, ModelPricing] = {}
for model_name, entry in raw_entries.items():
if not isinstance(model_name, str):
raise ConfigurationError(
f"User pricing file '{target}' has a non-string model key "
f"{model_name!r}; model names must be strings.",
file_path=str(target),
)
try:
override = PricingOverride.model_validate(entry)
except ValidationError as e:
raise ConfigurationError(
f"Invalid pricing entry for {model_name!r} in '{target}': {e}",
file_path=str(target),
) from e
overrides[model_name] = ModelPricing(
input_per_mtok=override.input_per_mtok,
output_per_mtok=override.output_per_mtok,
cache_read_per_mtok=override.cache_read_per_mtok,
cache_write_per_mtok=override.cache_write_per_mtok,
)

if overrides:
logger.info("Loaded %d user pricing override(s) from %s", len(overrides), target)

return overrides
50 changes: 40 additions & 10 deletions src/conductor/engine/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,27 +428,57 @@ def _workflow_dir(self) -> Path | None:
return Path(self.workflow_path).resolve().parent if self.workflow_path else None

def _build_pricing_overrides(self) -> dict[str, ModelPricing] | None:
"""Build pricing overrides from workflow cost configuration.
"""Build the merged pricing-override table for this workflow.

Converts PricingOverride Pydantic models from the workflow config
into ModelPricing dataclasses for use by the UsageTracker.
Layers two sources, with workflow entries always winning:

user file (``~/.conductor/pricing.yaml``) → workflow ``runtime.cost.pricing``

The merged dict is then handed to ``UsageTracker``, which passes it
to ``get_pricing(overrides=...)`` — and ``get_pricing`` checks
overrides before the built-in ``DEFAULT_PRICING`` table, so user
and workflow entries also beat the defaults.

A missing user file is silently OK (steady state for users who
haven't opted in). A malformed user file raises ``ConfigurationError``
from ``load_user_pricing``.

Returns:
Dictionary mapping model names to ModelPricing, or None if no overrides.
Mapping of model name to ``ModelPricing``, or ``None`` when
neither layer contributes any entries.
"""
cost_config = self.config.workflow.cost
if not cost_config.pricing:
return None
from conductor.config.user_pricing import load_user_pricing

user_overrides = load_user_pricing()

overrides: dict[str, ModelPricing] = {}
cost_config = self.config.workflow.cost
workflow_overrides: dict[str, ModelPricing] = {}
for model_name, pricing_override in cost_config.pricing.items():
overrides[model_name] = ModelPricing(
workflow_overrides[model_name] = ModelPricing(
input_per_mtok=pricing_override.input_per_mtok,
output_per_mtok=pricing_override.output_per_mtok,
cache_read_per_mtok=pricing_override.cache_read_per_mtok,
cache_write_per_mtok=pricing_override.cache_write_per_mtok,
)
return overrides

if not user_overrides and not workflow_overrides:
return None

# Workflow on top: dict union right-wins, so any model present in
# both layers gets the workflow value.
merged = {**user_overrides, **workflow_overrides}

# Surface shadowed user entries at DEBUG so users debugging an
# unexpected cost don't have to read source to understand precedence.
if user_overrides and workflow_overrides:
shadowed = sorted(set(user_overrides) & set(workflow_overrides))
for model in shadowed:
logger.debug(
"Workflow `runtime.cost.pricing` overrides user pricing for model %r",
model,
)

return merged

def _emit(self, event_type: str, data: dict[str, Any]) -> None:
"""Emit a workflow event if an emitter is configured.
Expand Down
46 changes: 46 additions & 0 deletions tests/test_cli/test_pricing_cmd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Tests for ``conductor pricing`` CLI."""

from __future__ import annotations

from pathlib import Path

import pytest
from typer.testing import CliRunner

from conductor.cli.app import app
from conductor.config.user_pricing import USER_PRICING_ENV_VAR

runner = CliRunner()


def test_path_prints_default(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
monkeypatch.delenv(USER_PRICING_ENV_VAR, raising=False)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
result = runner.invoke(app, ["pricing", "path"])
assert result.exit_code == 0
assert str(tmp_path / ".conductor" / "pricing.yaml") in result.stdout


def test_path_honors_env_var(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
custom = tmp_path / "elsewhere.yaml"
monkeypatch.setenv(USER_PRICING_ENV_VAR, str(custom))
result = runner.invoke(app, ["pricing", "path"])
assert result.exit_code == 0
assert str(custom) in result.stdout


def test_path_warns_when_missing(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
custom = tmp_path / "absent.yaml"
monkeypatch.setenv(USER_PRICING_ENV_VAR, str(custom))
result = runner.invoke(app, ["pricing", "path"])
assert result.exit_code == 0
# stderr is merged into output for CliRunner by default in older click,
# otherwise read it from .stderr. result.output is the full stream.
combined = result.stdout + (result.stderr if hasattr(result, "stderr") else "")
assert "does not exist" in combined


def test_no_args_shows_help() -> None:
result = runner.invoke(app, ["pricing"])
assert result.exit_code != 0 # typer convention for no_args_is_help
assert "path" in result.stdout
Loading
Loading