Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions sdk/python/src/agentspan/agents/_internal/provider_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,25 @@ def get_provider_spec(provider_name: str) -> Optional[ProviderSpec]:
Returns ``None`` if the provider is not in the registry.
"""
return PROVIDER_REGISTRY.get(provider_name)


# Mirrors server-side CredentialEnvSeeder.KNOWN_ENV_VARS — the full set of
# provider API key env vars the runtime will sync into the local server's
# credential store on boot. Keep in sync with the Java list.
KNOWN_PROVIDER_ENV_VARS: frozenset[str] = frozenset(
{
"OPENAI_API_KEY",
"ANTHROPIC_API_KEY",
"GEMINI_API_KEY",
"GOOGLE_API_KEY",
"MISTRAL_API_KEY",
"COHERE_API_KEY",
"XAI_API_KEY",
"PERPLEXITY_API_KEY",
"AZURE_OPENAI_API_KEY",
"HUGGINGFACE_API_KEY",
"GROQ_API_KEY",
"DEEPSEEK_API_KEY",
"TOGETHER_API_KEY",
}
)
186 changes: 179 additions & 7 deletions sdk/python/src/agentspan/agents/runtime/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,49 @@
logger = logging.getLogger("agentspan.agents.runtime")


def _is_local_server(server_url: str) -> bool:
"""Return True if *server_url* points to a loopback address."""
from urllib.parse import urlparse

if not server_url:
return False
host = (urlparse(server_url).hostname or "").lower()
return host in ("localhost", "127.0.0.1", "::1", "0.0.0.0")


def _sync_marker_path():
"""Filesystem location of the env-sync marker file.

Maps ``server_url`` → ``{"instance_id", "env_hash"}`` so we sync iff the
server JVM has restarted **or** the local env values have changed.
Module-level so tests can monkeypatch it to a tmp_path.
"""
from pathlib import Path

return Path.home() / ".agentspan" / "sync-marker.json"


def _compute_env_hash() -> str:
"""Stable SHA-256 hex of all known provider env vars.

Used in the sync marker to detect env-var changes between SDK invocations
even when the server JVM hasn't restarted. Without this, a user who fixes
a typo'd API key in their shell can't get the corrected value into the
server without restarting the JVM.
"""
import hashlib

from agentspan.agents._internal.provider_registry import KNOWN_PROVIDER_ENV_VARS

digest = hashlib.sha256()
for name in sorted(KNOWN_PROVIDER_ENV_VARS):
digest.update(name.encode("utf-8"))
digest.update(b"=")
digest.update((os.environ.get(name, "") or "").encode("utf-8"))
digest.update(b"\n")
return digest.hexdigest()


_RETRY_POLICY_MAP = {
"fixed": "FIXED",
"linear_backoff": "LINEAR_BACKOFF",
Expand Down Expand Up @@ -378,6 +421,12 @@ def __init__(

logger.info("AgentRuntime initialized (server=%s)", self._config.server_url)

# Push shell env vars into the local server's credential store so a
# corrected key reaches the running JVM without a restart. Skipped
# for remote — would clobber UI-managed credentials.
if _is_local_server(self._config.server_url):
self._sync_provider_env_to_server()

# ── Sync/async bridge ────────────────────────────────────────────

@staticmethod
Expand Down Expand Up @@ -2072,13 +2121,26 @@ def _ensure_model(self, model_string: str) -> None:

self._integration_api_available = True
except Exception as e:
if self._integration_api_available is None:
# First failure — likely OSS Conductor without integration API
logger.warning(
"Integration API not available (OSS Conductor?). "
"Auto-registration disabled: %s",
e,
)
first_failure = self._integration_api_available is None
# First failure → assume OSS Conductor (no integration API); push
# the key via /api/credentials instead. Later failures are per-model.
if first_failure:
try:
self._push_credential_to_server(spec.api_key_env, api_key)
logger.info(
"Integration API not available (OSS Conductor?). "
"Pushed %s via /api/credentials instead: %s",
spec.api_key_env,
e,
)
except Exception as cred_err:
logger.warning(
"Auto-registration failed for '%s' on both integration "
"API (%s) and credentials API (%s).",
model_string,
e,
cred_err,
)
self._integration_api_available = False
else:
logger.warning(
Expand All @@ -2089,6 +2151,116 @@ def _ensure_model(self, model_string: str) -> None:

self._ensured_models.add(model_string)

# ── Credential push (Agentspan-native /api/credentials) ────────────

def _push_credential_to_server(self, name: str, value: str) -> None:
"""Upsert a credential on the Agentspan server via PUT /api/credentials/{name}.

Used as a fallback when the Conductor integration API is unavailable
(e.g. the local OSS server). Raises on HTTP error so callers can decide
whether to swallow the failure.
"""
import httpx

base = self._config.server_url.rstrip("/")
url = f"{base}/credentials/{name}"

headers: Dict[str, str] = {}
if self._config.api_key:
headers["Authorization"] = f"Bearer {self._config.api_key}"
elif self._config.auth_key:
headers["X-Auth-Key"] = self._config.auth_key
if self._config.auth_secret:
headers["X-Auth-Secret"] = self._config.auth_secret

resp = httpx.put(url, json={"value": value}, headers=headers, timeout=5.0)
resp.raise_for_status()

def _sync_provider_env_to_server(self) -> None:
"""Push every known provider env var into the server's credential store.

Gated by ``(instance_id, env_hash)`` cached in
``~/.agentspan/sync-marker.json``. Skip iff BOTH match — meaning the
same JVM AND the same shell env we already synced. Re-sync when the
server restarts OR the user corrects an env var, even within the same
JVM. Failures are logged at debug and swallowed.
"""
from agentspan.agents._internal.provider_registry import KNOWN_PROVIDER_ENV_VARS

current_instance = self._fetch_server_instance_id()
current_env_hash = _compute_env_hash()
marker = self._read_sync_marker()
entry = marker.get(self._config.server_url)

# Legacy schema tolerance: entry used to be a bare string instance_id.
if isinstance(entry, dict) and current_instance is not None:
if (
entry.get("instance_id") == current_instance
and entry.get("env_hash") == current_env_hash
):
logger.debug(
"Skipping env sync — instance %s and env unchanged.", current_instance
)
return

for name in sorted(KNOWN_PROVIDER_ENV_VARS):
value = os.environ.get(name)
if not value:
continue
try:
self._push_credential_to_server(name, value)
logger.debug("Synced %s into local server credential store", name)
except Exception as e:
logger.debug("Could not sync %s into local server: %s", name, e)

if current_instance is not None:
marker[self._config.server_url] = {
"instance_id": current_instance,
"env_hash": current_env_hash,
}
self._write_sync_marker(marker)

def _fetch_server_instance_id(self) -> "Optional[str]":
"""Return the running server's ``instance_id`` or ``None`` on any error.

Older servers without ``/api/info`` return ``None``; the caller falls
back to an unconditional sync (best-effort).
"""
import httpx

base = self._config.server_url.rstrip("/")
try:
resp = httpx.get(f"{base}/info", timeout=2.0)
resp.raise_for_status()
return resp.json().get("instance_id")
except Exception as e:
logger.debug("Could not fetch /api/info: %s", e)
return None

@staticmethod
def _read_sync_marker() -> Dict[str, Any]:
"""Return the marker file as a dict; empty dict if missing/corrupt.

Values can be either ``{"instance_id", "env_hash"}`` dicts (current
schema) or bare instance_id strings (legacy). The legacy form is
treated as a non-match so it forces a re-sync.
"""
path = _sync_marker_path()
try:
return json.loads(path.read_text())
except Exception:
return {}

@staticmethod
def _write_sync_marker(marker: Dict[str, Any]) -> None:
"""Persist the marker, creating ``~/.agentspan`` if necessary."""
path = _sync_marker_path()
try:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(marker))
except Exception as e:
logger.debug("Could not write sync marker: %s", e)

def _ensure_models_for_agent(self, agent: Agent) -> None:
"""Walk the agent tree and ensure all referenced models are registered."""
seen: set = set()
Expand Down
Loading
Loading