diff --git a/AGENTS.md b/AGENTS.md index 4122628..7847ecf 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -69,11 +69,13 @@ make validate-examples # validate all examples - `bg_runner.py` - Background process forking for `--web-bg` mode - `pid.py` - PID file utilities for tracking/stopping background processes - `update.py` - Update check, version comparison, and self-upgrade via `uv tool install` + - `pricing.py` - `conductor pricing path` subcommand: print where conductor reads user-level pricing from - **config/**: YAML loading and Pydantic schema validation - `schema.py` - Pydantic models for all workflow YAML structures (WorkflowConfig, AgentDef, ParallelGroup, ForEachDef, etc.) - `loader.py` - YAML parsing with environment variable resolution (${VAR:-default}) and `!file` tag support - `validator.py` - Cross-reference validation (agent names, routes, parallel groups) + - `user_pricing.py` - Loads optional machine-wide pricing overrides from `~/.conductor/pricing.yaml` (path overridable via `CONDUCTOR_PRICING_FILE`); missing file is silent, malformed file is a hard error - **engine/**: Workflow execution orchestration - `workflow.py` - Main `WorkflowEngine` class that orchestrates agent execution, parallel groups, for-each groups, and routing diff --git a/README.md b/README.md index 1e6524f..306b4ed 100644 --- a/README.md +++ b/README.md @@ -216,6 +216,62 @@ conductor validate **Full CLI documentation:** [docs/cli-reference.md](docs/cli-reference.md) +## Cost Tracking + +Conductor estimates per-agent and per-workflow costs from token usage and a +built-in pricing table (`src/conductor/engine/pricing.py`). When a model is +absent from the table, that agent's cost rolls up as `$0` rather than failing — +so for non-public, preview, or newly-released models you'll want to supply +pricing yourself. + +Pricing resolves in this order, highest precedence first: + +1. **Exact** match in workflow `runtime.cost.pricing` +2. **Exact** match in user `~/.conductor/pricing.yaml` +3. **Exact** match in built-in `DEFAULT_PRICING` +4. **Fuzzy** (versioned-suffix) match against built-in `DEFAULT_PRICING` + +User and workflow overrides are exact-match only. List each concrete model +name you want to override; family-name entries do not auto-cover descendants. + +### Per-workflow overrides + +```yaml +workflow: + cost: + show_summary: true + pricing: + claude-opus-4.7-high: + input_per_mtok: 15.00 + output_per_mtok: 75.00 + cache_read_per_mtok: 1.50 + cache_write_per_mtok: 18.75 +``` + +### Machine-wide overrides + +For pricing you want to apply to every workflow on a machine — typical for +preview models without published rates — drop a `~/.conductor/pricing.yaml`: + +```yaml +pricing: + claude-opus-4.7-high: + input_per_mtok: 15.00 + output_per_mtok: 75.00 + gpt-5.4: + input_per_mtok: 2.00 + output_per_mtok: 8.00 +``` + +* Missing file is silently OK. +* Malformed file is a hard error with a pointer to the path; bypass a broken + file by pointing `CONDUCTOR_PRICING_FILE` at a path that doesn't exist. +* Workflow entries always win for the same model name. +* Override entries are exact-match only — list each concrete model name + (e.g. `claude-opus-4-20250514`, not just `claude-opus-4`). + +Run `conductor pricing path` to print the resolved file location. + ## Workflow Registries Conductor supports named workflow registries — GitHub repos or local directories diff --git a/src/conductor/cli/app.py b/src/conductor/cli/app.py index 6a773cf..2ffa1b1 100644 --- a/src/conductor/cli/app.py +++ b/src/conductor/cli/app.py @@ -36,9 +36,11 @@ class ConsoleVerbosity(str, Enum): ) # Register subcommand groups +from conductor.cli.pricing import pricing_app # noqa: E402 from conductor.cli.registry import registry_app # noqa: E402 app.add_typer(registry_app) +app.add_typer(pricing_app) # Rich console for formatted output console = Console(stderr=True) diff --git a/src/conductor/cli/pricing.py b/src/conductor/cli/pricing.py new file mode 100644 index 0000000..ee7b8c0 --- /dev/null +++ b/src/conductor/cli/pricing.py @@ -0,0 +1,28 @@ +"""Typer subcommand group for inspecting pricing overrides.""" + +from __future__ import annotations + +import typer + +from conductor.config.user_pricing import USER_PRICING_ENV_VAR, get_user_pricing_path + +pricing_app = typer.Typer( + name="pricing", + help="Inspect machine-wide pricing overrides.", + no_args_is_help=True, +) + + +@pricing_app.command("path") +def path() -> None: + """Print the path conductor would read for user-level pricing. + + Honors the ``CONDUCTOR_PRICING_FILE`` environment variable. + """ + target = get_user_pricing_path() + typer.echo(str(target)) + if not target.exists(): + typer.echo( + f"(File does not exist; create it or set {USER_PRICING_ENV_VAR}.)", + err=True, + ) diff --git a/src/conductor/config/user_pricing.py b/src/conductor/config/user_pricing.py new file mode 100644 index 0000000..a469683 --- /dev/null +++ b/src/conductor/config/user_pricing.py @@ -0,0 +1,112 @@ +"""User-level (machine-wide) pricing overrides. + +Loads optional pricing overrides from ``~/.conductor/pricing.yaml`` (or the +path in ``CONDUCTOR_PRICING_FILE``, with ``~`` expansion). A missing file +returns an empty mapping; a malformed file raises ``ConfigurationError`` +with a pointer to the path — silent acceptance of corrupted overrides +would re-introduce the "your costs are wrong and you don't know" bug this +exists to solve. +""" + +from __future__ import annotations + +import logging +import os +from pathlib import Path + +from pydantic import ValidationError +from ruamel.yaml import YAML +from ruamel.yaml.error import YAMLError + +from conductor.config.schema import PricingOverride +from conductor.engine.pricing import ModelPricing +from conductor.exceptions import ConfigurationError + +logger = logging.getLogger(__name__) + +USER_PRICING_ENV_VAR = "CONDUCTOR_PRICING_FILE" + + +def get_user_pricing_path() -> Path: + """Return the path conductor would read for user-level pricing. + + Honors ``CONDUCTOR_PRICING_FILE`` (with ``~`` expansion) when set; + otherwise returns ``~/.conductor/pricing.yaml``. + """ + raw = os.environ.get(USER_PRICING_ENV_VAR) + if raw: + return Path(os.path.expanduser(raw)) + return Path.home() / ".conductor" / "pricing.yaml" + + +def load_user_pricing(path: Path | None = None) -> dict[str, ModelPricing]: + """Load machine-wide pricing overrides from the user file. + + Returns an empty mapping when the file does not exist. Raises + ``ConfigurationError`` when the file exists but is unreadable, invalid + YAML, or fails ``PricingOverride`` schema validation. + """ + target = path if path is not None else get_user_pricing_path() + + if not target.exists(): + return {} + + try: + data = YAML(typ="safe").load(target.read_text(encoding="utf-8")) + except (OSError, YAMLError) as e: + raise ConfigurationError( + f"Failed to load user pricing file '{target}': {e}", + suggestion=( + "Fix the file, delete it, or temporarily bypass it by setting " + "CONDUCTOR_PRICING_FILE to a path that does not exist " + "(e.g. CONDUCTOR_PRICING_FILE=/dev/null)." + ), + file_path=str(target), + ) from e + + if data is None: + return {} + + if not isinstance(data, dict) or "pricing" not in data: + raise ConfigurationError( + f"User pricing file '{target}' must contain a top-level `pricing:` mapping.", + file_path=str(target), + ) + + raw_entries = data["pricing"] + if raw_entries is None: + return {} + if not isinstance(raw_entries, dict): + raise ConfigurationError( + f"User pricing file '{target}' has a `pricing:` value of type " + f"{type(raw_entries).__name__}; expected a mapping of model name to " + f"pricing override.", + file_path=str(target), + ) + + overrides: dict[str, ModelPricing] = {} + for model_name, entry in raw_entries.items(): + if not isinstance(model_name, str): + raise ConfigurationError( + f"User pricing file '{target}' has a non-string model key " + f"{model_name!r}; model names must be strings.", + file_path=str(target), + ) + try: + override = PricingOverride.model_validate(entry) + except ValidationError as e: + raise ConfigurationError( + f"Invalid pricing entry for {model_name!r} in '{target}': {e}", + file_path=str(target), + ) from e + overrides[model_name] = ModelPricing( + input_per_mtok=override.input_per_mtok, + output_per_mtok=override.output_per_mtok, + cache_read_per_mtok=override.cache_read_per_mtok, + cache_write_per_mtok=override.cache_write_per_mtok, + ) + + if overrides: + logger.info("Loaded %d user pricing override(s) from %s", len(overrides), target) + + return overrides diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index 5c965cc..d951f8d 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -428,27 +428,57 @@ def _workflow_dir(self) -> Path | None: return Path(self.workflow_path).resolve().parent if self.workflow_path else None def _build_pricing_overrides(self) -> dict[str, ModelPricing] | None: - """Build pricing overrides from workflow cost configuration. + """Build the merged pricing-override table for this workflow. - Converts PricingOverride Pydantic models from the workflow config - into ModelPricing dataclasses for use by the UsageTracker. + Layers two sources, with workflow entries always winning: + + user file (``~/.conductor/pricing.yaml``) → workflow ``runtime.cost.pricing`` + + The merged dict is then handed to ``UsageTracker``, which passes it + to ``get_pricing(overrides=...)`` — and ``get_pricing`` checks + overrides before the built-in ``DEFAULT_PRICING`` table, so user + and workflow entries also beat the defaults. + + A missing user file is silently OK (steady state for users who + haven't opted in). A malformed user file raises ``ConfigurationError`` + from ``load_user_pricing``. Returns: - Dictionary mapping model names to ModelPricing, or None if no overrides. + Mapping of model name to ``ModelPricing``, or ``None`` when + neither layer contributes any entries. """ - cost_config = self.config.workflow.cost - if not cost_config.pricing: - return None + from conductor.config.user_pricing import load_user_pricing + + user_overrides = load_user_pricing() - overrides: dict[str, ModelPricing] = {} + cost_config = self.config.workflow.cost + workflow_overrides: dict[str, ModelPricing] = {} for model_name, pricing_override in cost_config.pricing.items(): - overrides[model_name] = ModelPricing( + workflow_overrides[model_name] = ModelPricing( input_per_mtok=pricing_override.input_per_mtok, output_per_mtok=pricing_override.output_per_mtok, cache_read_per_mtok=pricing_override.cache_read_per_mtok, cache_write_per_mtok=pricing_override.cache_write_per_mtok, ) - return overrides + + if not user_overrides and not workflow_overrides: + return None + + # Workflow on top: dict union right-wins, so any model present in + # both layers gets the workflow value. + merged = {**user_overrides, **workflow_overrides} + + # Surface shadowed user entries at DEBUG so users debugging an + # unexpected cost don't have to read source to understand precedence. + if user_overrides and workflow_overrides: + shadowed = sorted(set(user_overrides) & set(workflow_overrides)) + for model in shadowed: + logger.debug( + "Workflow `runtime.cost.pricing` overrides user pricing for model %r", + model, + ) + + return merged def _emit(self, event_type: str, data: dict[str, Any]) -> None: """Emit a workflow event if an emitter is configured. diff --git a/tests/test_cli/test_pricing_cmd.py b/tests/test_cli/test_pricing_cmd.py new file mode 100644 index 0000000..4e2cd31 --- /dev/null +++ b/tests/test_cli/test_pricing_cmd.py @@ -0,0 +1,46 @@ +"""Tests for ``conductor pricing`` CLI.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest +from typer.testing import CliRunner + +from conductor.cli.app import app +from conductor.config.user_pricing import USER_PRICING_ENV_VAR + +runner = CliRunner() + + +def test_path_prints_default(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + monkeypatch.delenv(USER_PRICING_ENV_VAR, raising=False) + monkeypatch.setattr(Path, "home", lambda: tmp_path) + result = runner.invoke(app, ["pricing", "path"]) + assert result.exit_code == 0 + assert str(tmp_path / ".conductor" / "pricing.yaml") in result.stdout + + +def test_path_honors_env_var(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + custom = tmp_path / "elsewhere.yaml" + monkeypatch.setenv(USER_PRICING_ENV_VAR, str(custom)) + result = runner.invoke(app, ["pricing", "path"]) + assert result.exit_code == 0 + assert str(custom) in result.stdout + + +def test_path_warns_when_missing(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + custom = tmp_path / "absent.yaml" + monkeypatch.setenv(USER_PRICING_ENV_VAR, str(custom)) + result = runner.invoke(app, ["pricing", "path"]) + assert result.exit_code == 0 + # stderr is merged into output for CliRunner by default in older click, + # otherwise read it from .stderr. result.output is the full stream. + combined = result.stdout + (result.stderr if hasattr(result, "stderr") else "") + assert "does not exist" in combined + + +def test_no_args_shows_help() -> None: + result = runner.invoke(app, ["pricing"]) + assert result.exit_code != 0 # typer convention for no_args_is_help + assert "path" in result.stdout diff --git a/tests/test_config/test_user_pricing.py b/tests/test_config/test_user_pricing.py new file mode 100644 index 0000000..133389b --- /dev/null +++ b/tests/test_config/test_user_pricing.py @@ -0,0 +1,195 @@ +"""Tests for ``conductor.config.user_pricing``.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from conductor.config.user_pricing import ( + USER_PRICING_ENV_VAR, + get_user_pricing_path, + load_user_pricing, +) +from conductor.exceptions import ConfigurationError + +# --- get_user_pricing_path --------------------------------------------------- + + +def test_default_path_under_home(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + monkeypatch.delenv(USER_PRICING_ENV_VAR, raising=False) + monkeypatch.setattr(Path, "home", lambda: tmp_path) + assert get_user_pricing_path() == tmp_path / ".conductor" / "pricing.yaml" + + +def test_env_var_overrides_default(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + custom = tmp_path / "custom.yaml" + monkeypatch.setenv(USER_PRICING_ENV_VAR, str(custom)) + assert get_user_pricing_path() == custom + + +def test_env_var_expands_tilde(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + # ``~`` is expanded by os.path.expanduser, which honors HOME on Unix + # and USERPROFILE on Windows. + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setenv("USERPROFILE", str(tmp_path)) + monkeypatch.setenv(USER_PRICING_ENV_VAR, "~/custom.yaml") + assert get_user_pricing_path() == tmp_path / "custom.yaml" + + +# --- load_user_pricing ------------------------------------------------------- + + +def test_missing_file_returns_empty(tmp_path: Path) -> None: + assert load_user_pricing(tmp_path / "absent.yaml") == {} + + +def test_empty_file_returns_empty(tmp_path: Path) -> None: + target = tmp_path / "p.yaml" + target.write_text("", encoding="utf-8") + assert load_user_pricing(target) == {} + + +def test_pricing_none_returns_empty(tmp_path: Path) -> None: + """`pricing:` with no value parses as None — treat as no entries.""" + target = tmp_path / "p.yaml" + target.write_text("pricing:\n", encoding="utf-8") + assert load_user_pricing(target) == {} + + +def test_loads_entries(tmp_path: Path) -> None: + target = tmp_path / "p.yaml" + target.write_text( + "pricing:\n" + " custom-model:\n" + " input_per_mtok: 1.5\n" + " output_per_mtok: 4.5\n" + " cache_read_per_mtok: 0.15\n" + " cache_write_per_mtok: 1.875\n", + encoding="utf-8", + ) + overrides = load_user_pricing(target) + assert set(overrides) == {"custom-model"} + pricing = overrides["custom-model"] + assert pricing.input_per_mtok == 1.5 + assert pricing.output_per_mtok == 4.5 + assert pricing.cache_read_per_mtok == 0.15 + assert pricing.cache_write_per_mtok == 1.875 + + +def test_cache_fields_default_to_zero(tmp_path: Path) -> None: + target = tmp_path / "p.yaml" + target.write_text( + "pricing:\n m:\n input_per_mtok: 1\n output_per_mtok: 2\n", + encoding="utf-8", + ) + pricing = load_user_pricing(target)["m"] + assert pricing.cache_read_per_mtok == 0.0 + assert pricing.cache_write_per_mtok == 0.0 + + +def test_invalid_yaml_raises_with_path(tmp_path: Path) -> None: + target = tmp_path / "p.yaml" + target.write_text("pricing:\n bad: : :", encoding="utf-8") + with pytest.raises(ConfigurationError) as exc: + load_user_pricing(target) + assert str(target) in str(exc.value) + + +def test_top_level_not_mapping_raises(tmp_path: Path) -> None: + target = tmp_path / "p.yaml" + target.write_text("- just a list\n", encoding="utf-8") + with pytest.raises(ConfigurationError, match="top-level `pricing:`"): + load_user_pricing(target) + + +def test_missing_pricing_key_raises(tmp_path: Path) -> None: + target = tmp_path / "p.yaml" + target.write_text("other: value\n", encoding="utf-8") + with pytest.raises(ConfigurationError, match="top-level `pricing:`"): + load_user_pricing(target) + + +def test_negative_price_rejected(tmp_path: Path) -> None: + target = tmp_path / "p.yaml" + target.write_text( + "pricing:\n m:\n input_per_mtok: -1\n output_per_mtok: 2\n", + encoding="utf-8", + ) + with pytest.raises(ConfigurationError, match="'m'"): + load_user_pricing(target) + + +def test_missing_required_field_rejected(tmp_path: Path) -> None: + target = tmp_path / "p.yaml" + target.write_text( + "pricing:\n m:\n input_per_mtok: 1\n", + encoding="utf-8", + ) + with pytest.raises(ConfigurationError, match="'m'"): + load_user_pricing(target) + + +def test_pricing_value_bool_rejected(tmp_path: Path) -> None: + target = tmp_path / "p.yaml" + target.write_text("pricing: true\n", encoding="utf-8") + with pytest.raises(ConfigurationError, match="bool"): + load_user_pricing(target) + + +def test_pricing_value_list_rejected(tmp_path: Path) -> None: + """An empty list at `pricing:` should not silently no-op — the user + almost certainly meant a mapping.""" + target = tmp_path / "p.yaml" + target.write_text("pricing: []\n", encoding="utf-8") + with pytest.raises(ConfigurationError, match="list"): + load_user_pricing(target) + + +def test_non_string_model_key_rejected(tmp_path: Path) -> None: + target = tmp_path / "p.yaml" + target.write_text( + "pricing:\n 123:\n input_per_mtok: 1\n output_per_mtok: 2\n", + encoding="utf-8", + ) + with pytest.raises(ConfigurationError, match="non-string"): + load_user_pricing(target) + + +def test_load_error_suggests_bypass(tmp_path: Path) -> None: + """The hard-error path must surface the CONDUCTOR_PRICING_FILE escape hatch.""" + target = tmp_path / "p.yaml" + target.write_text("pricing:\n bad: : :", encoding="utf-8") + with pytest.raises(ConfigurationError) as exc: + load_user_pricing(target) + assert "CONDUCTOR_PRICING_FILE" in str(exc.value) + + +def test_load_uses_env_var_when_no_path(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + target = tmp_path / "via-env.yaml" + target.write_text( + "pricing:\n m:\n input_per_mtok: 1\n output_per_mtok: 2\n", + encoding="utf-8", + ) + monkeypatch.setenv(USER_PRICING_ENV_VAR, str(target)) + assert "m" in load_user_pricing() + + +def test_unreadable_file_raises(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + target = tmp_path / "p.yaml" + target.write_text("pricing: {}\n", encoding="utf-8") + + def _boom(*_args: object, **_kwargs: object) -> str: + raise OSError("permission denied") + + monkeypatch.setattr(Path, "read_text", _boom) + with pytest.raises(ConfigurationError, match="permission denied"): + load_user_pricing(target) + + +# Ensure the env-var test cleanup is bulletproof in CI. +def test_env_var_unset_after_each_test(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv(USER_PRICING_ENV_VAR, raising=False) + import os + + assert USER_PRICING_ENV_VAR not in os.environ diff --git a/tests/test_engine/test_pricing_layering.py b/tests/test_engine/test_pricing_layering.py new file mode 100644 index 0000000..1785eba --- /dev/null +++ b/tests/test_engine/test_pricing_layering.py @@ -0,0 +1,97 @@ +"""Tests for ``WorkflowEngine._build_pricing_overrides`` layering.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +from conductor.config.schema import ( + AgentDef, + CostConfig, + PricingOverride, + WorkflowConfig, + WorkflowDef, +) +from conductor.config.user_pricing import USER_PRICING_ENV_VAR +from conductor.engine.workflow import WorkflowEngine + + +def _make_engine( + workflow_pricing: dict[str, PricingOverride] | None = None, +) -> WorkflowEngine: + config = WorkflowConfig( + workflow=WorkflowDef( + name="t", + entry_point="a", + cost=CostConfig(pricing=workflow_pricing or {}), + ), + agents=[AgentDef(name="a", model="gpt-4o", prompt="hi")], + ) + return WorkflowEngine(config=config, provider=MagicMock()) + + +@pytest.fixture +def user_pricing_file(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> Path: + """Provide an isolated user-pricing file via CONDUCTOR_PRICING_FILE.""" + target = tmp_path / "user.yaml" + monkeypatch.setenv(USER_PRICING_ENV_VAR, str(target)) + return target + + +def test_no_overrides_returns_none(user_pricing_file: Path) -> None: + engine = _make_engine() + assert engine._build_pricing_overrides() is None + + +def test_user_only(user_pricing_file: Path) -> None: + user_pricing_file.write_text( + "pricing:\n custom:\n input_per_mtok: 1\n output_per_mtok: 2\n", + encoding="utf-8", + ) + overrides = _make_engine()._build_pricing_overrides() + assert overrides is not None + assert overrides["custom"].input_per_mtok == 1.0 + + +def test_workflow_only(user_pricing_file: Path) -> None: + overrides = _make_engine( + workflow_pricing={"wf": PricingOverride(input_per_mtok=10, output_per_mtok=20)}, + )._build_pricing_overrides() + assert overrides is not None + assert overrides["wf"].input_per_mtok == 10.0 + + +def test_workflow_overrides_user_for_same_model(user_pricing_file: Path) -> None: + user_pricing_file.write_text( + "pricing:\n m:\n input_per_mtok: 1\n output_per_mtok: 2\n", + encoding="utf-8", + ) + overrides = _make_engine( + workflow_pricing={"m": PricingOverride(input_per_mtok=99, output_per_mtok=99)}, + )._build_pricing_overrides() + assert overrides is not None + assert overrides["m"].input_per_mtok == 99.0 + + +def test_distinct_models_merge(user_pricing_file: Path) -> None: + user_pricing_file.write_text( + "pricing:\n user-only:\n input_per_mtok: 1\n output_per_mtok: 2\n", + encoding="utf-8", + ) + overrides = _make_engine( + workflow_pricing={ + "wf-only": PricingOverride(input_per_mtok=10, output_per_mtok=20), + }, + )._build_pricing_overrides() + assert overrides is not None + assert set(overrides) == {"user-only", "wf-only"} + + +def test_malformed_user_file_raises(user_pricing_file: Path) -> None: + user_pricing_file.write_text("pricing:\n bad: : :", encoding="utf-8") + from conductor.exceptions import ConfigurationError + + with pytest.raises(ConfigurationError): + _make_engine()