Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
demo-strategus-cohort-incidence/*
conda-pip-env-github-workflow.yml
*sandbox*
*.doit.db*
*data*
Expand Down
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,47 @@ export STUDY_AGENT_MCP_COMMAND=study-agent-mcp
export STUDY_AGENT_MCP_ARGS=""
study-agent-acp
```
Note: This starts MCP via stdio. If you use MCP over HTTP, do not set `STUDY_AGENT_MCP_COMMAND`.
Note: Prefer stopping the ACP process (SIGINT/SIGTERM) so the MCP subprocess is closed cleanly. Killing the MCP directly can leave defunct processes.
Note: ACP uses a threaded HTTP server by default. Set `STUDY_AGENT_THREADING=0` to disable threading.
Note: `/health` includes MCP preflight details under `mcp_index` when MCP is configured.
Troubleshooting: run `python mcp_server/scripts/mcp_probe.py` to verify index paths and search without ACP.

### MCP over HTTP (recommended for cross-platform stability)

Start MCP as a separate HTTP service:

```bash
export MCP_TRANSPORT=http
export MCP_HOST=127.0.0.1
export MCP_PORT=8790
export MCP_PATH=/mcp
study-agent-mcp
```

Then point ACP at it:

```bash
export STUDY_AGENT_MCP_URL="http://127.0.0.1:8790/mcp"
study-agent-acp
```
Note: `STUDY_AGENT_MCP_URL` must include the port (e.g. `:8790`). When set, ACP uses HTTP and ignores `STUDY_AGENT_MCP_COMMAND`.

PowerShell (Windows) quickstart:

```powershell
$env:MCP_TRANSPORT = "http"
$env:MCP_HOST = "127.0.0.1"
$env:MCP_PORT = "8790"
$env:MCP_PATH = "/mcp"
study-agent-mcp
```

```powershell
$env:STUDY_AGENT_MCP_URL = "http://127.0.0.1:8790/mcp"
study-agent-acp
```

2. Run `phenotype_recommendation`
```bash
curl -s -X POST http://127.0.0.1:8765/flows/phenotype_recommendation \
Expand Down
5 changes: 5 additions & 0 deletions acp_agent/study_agent_acp/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ def run_phenotype_intent_split_flow(
return {"status": "error", "error": "missing study_intent"}
if self._mcp_client is None:
return {"status": "error", "error": "MCP client unavailable"}
debug = os.getenv("STUDY_AGENT_DEBUG", "0") == "1"

prompt_bundle = self.call_tool(
name="phenotype_intent_split",
Expand All @@ -292,7 +293,11 @@ def run_phenotype_intent_split_flow(
output_schema=prompt_full.get("output_schema", {}),
study_intent=study_intent,
)
if debug:
print("ACP DEBUG > phenotype_intent_split: calling LLM")
llm_result = call_llm(prompt)
if debug:
print("ACP DEBUG > phenotype_intent_split: LLM returned")
if llm_result is None:
return {
"status": "error",
Expand Down
17 changes: 13 additions & 4 deletions acp_agent/study_agent_acp/llm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import os
import time
import urllib.error
import urllib.request
from typing import Any, Dict, Optional
Expand Down Expand Up @@ -222,14 +223,17 @@ def call_llm(prompt: str) -> Optional[Dict[str, Any]]:
model = os.getenv("LLM_MODEL", "agentstudyassistant")
timeout = int(os.getenv("LLM_TIMEOUT", "180"))
log_enabled = os.getenv("LLM_LOG", "0") == "1"
log_prompt = os.getenv("LLM_LOG_PROMPT", "0") == "1"
log_response = os.getenv("LLM_LOG_RESPONSE", "0") == "1"
log_json = os.getenv("LLM_LOG_JSON", "0") == "1"
dry_run = os.getenv("LLM_DRY_RUN", "0") == "1"
use_responses = os.getenv("LLM_USE_RESPONSES", "0") == "1"

if log_enabled:
print(f"LLM CONFIG > url={api_url} model={model} timeout={timeout} responses={use_responses}")

if dry_run:
if log_enabled:
if log_enabled or log_prompt:
print("LLM DRY RUN > skipping API call")
print("LLM OUTGOING PROMPT >", prompt)
return None
Expand Down Expand Up @@ -257,15 +261,18 @@ def call_llm(prompt: str) -> Optional[Dict[str, Any]]:
request.add_header("Content-Type", "application/json")
request.add_header("Authorization", f"Bearer {api_key}")

if log_enabled:
if log_enabled or log_prompt:
print("LLM OUTGOING PROMPT >", prompt)

try:
start = time.time()
with urllib.request.urlopen(request, timeout=timeout) as response:
raw = response.read().decode("utf-8")
if log_enabled:
print(f"LLM TIMING > seconds={time.time() - start:.2f}")
except urllib.error.HTTPError as exc:
raw = exc.read().decode("utf-8")
if log_enabled:
if log_enabled or log_response:
print(f"LLM HTTP ERROR > {exc.code}")
print("LLM ERROR BODY >", raw)
return None
Expand All @@ -274,13 +281,15 @@ def call_llm(prompt: str) -> Optional[Dict[str, Any]]:
print(f"LLM ERROR > {exc}")
return None

if log_enabled:
if log_enabled or log_response:
print("LLM RAW RESPONSE >", raw)

try:
data = json.loads(raw)
except json.JSONDecodeError:
return _extract_json_object(raw)
if log_json:
print("LLM JSON >", data)

if use_responses:
output_text = None
Expand Down
112 changes: 112 additions & 0 deletions acp_agent/study_agent_acp/mcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@
from contextlib import AsyncExitStack
from dataclasses import dataclass
import os
import socket
from threading import Lock
from typing import Any, Dict, List, Optional

import anyio
import httpx
from urllib.parse import urlparse
from anyio.from_thread import start_blocking_portal
from mcp.client.session import ClientSession
from mcp.client.stdio import StdioServerParameters, stdio_client
from mcp.client.streamable_http import streamable_http_client
from mcp.shared._httpx_utils import create_mcp_http_client


@dataclass
Expand Down Expand Up @@ -184,3 +189,110 @@ def _should_use_oneshot(exc: Exception) -> bool:
if "GeneratorContextManager" in message:
return True
return False


@dataclass
class HttpMCPClientConfig:
url: str
token: Optional[str] = None
timeout: int = 30


class HttpMCPClient:
def __init__(self, config: HttpMCPClientConfig) -> None:
if "://" not in config.url:
config.url = f"http://{config.url}"
self._config = config
parsed = urlparse(config.url)
self._host = parsed.hostname
self._port = parsed.port
self._lock = Lock()
self._portal = None
self._portal_cm = None
self._session: ClientSession | None = None
self._exit_stack: AsyncExitStack | None = None

def list_tools(self) -> List[Dict[str, Any]]:
self._ensure_session()
assert self._portal is not None
return self._portal.call(self._list_tools)

def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
self._ensure_session()
assert self._portal is not None
return self._portal.call(self._call_tool, name, arguments)

def health_check(self) -> Dict[str, Any]:
if self._host and self._port:
try:
with socket.create_connection((self._host, self._port), timeout=1):
return {"ok": True, "mode": "http"}
except OSError as exc:
return {"ok": False, "error": str(exc)}
try:
self._ensure_session()
assert self._portal is not None
return self._portal.call(self._ping)
except Exception as exc:
return {"ok": False, "error": str(exc)}

async def _list_tools(self) -> List[Dict[str, Any]]:
assert self._session is not None
result = await self._session.list_tools()
return [tool.model_dump() for tool in result.tools]

async def _call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
assert self._session is not None
result = await self._session.call_tool(name=name, arguments=arguments)
if result.structuredContent is not None:
return result.structuredContent
return {"content": [c.model_dump() for c in result.content or []]}

async def _ping(self) -> Dict[str, Any]:
assert self._session is not None
await self._session.send_ping()
return {"ok": True, "mode": "http"}

def _ensure_session(self) -> None:
if self._session is not None:
return
with self._lock:
if self._session is not None:
return
self._portal_cm = start_blocking_portal()
self._portal = self._portal_cm.__enter__()
assert self._portal is not None
self._portal.call(self._async_init)

async def _async_init(self) -> None:
headers = {}
if self._config.token:
headers["Authorization"] = f"Bearer {self._config.token}"
timeout = httpx.Timeout(self._config.timeout)
client = create_mcp_http_client(headers=headers, timeout=timeout)
self._exit_stack = AsyncExitStack()
await self._exit_stack.enter_async_context(client)
read_stream, write_stream, _ = await self._exit_stack.enter_async_context(
streamable_http_client(self._config.url, http_client=client)
)
session = ClientSession(read_stream, write_stream)
await self._exit_stack.enter_async_context(session)
await session.initialize()
self._session = session

def close(self) -> None:
if self._portal is None:
return
try:
self._portal.call(self._async_close)
finally:
if self._portal_cm is not None:
self._portal_cm.__exit__(None, None, None)
self._portal_cm = None
self._portal = None

async def _async_close(self) -> None:
if self._exit_stack is not None:
await self._exit_stack.aclose()
self._exit_stack = None
self._session = None
36 changes: 28 additions & 8 deletions acp_agent/study_agent_acp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Any, Dict, Optional

from .agent import StudyAgent
from .mcp_client import StdioMCPClient, StdioMCPClientConfig
from .mcp_client import HttpMCPClient, HttpMCPClientConfig, StdioMCPClient, StdioMCPClientConfig

SERVICES = [
{"name": "phenotype_recommendation", "endpoint": "/flows/phenotype_recommendation"},
Expand Down Expand Up @@ -70,7 +70,7 @@ def _load_registry_services() -> tuple[list[Dict[str, Any]], list[str]]:

class ACPRequestHandler(BaseHTTPRequestHandler):
agent: StudyAgent
mcp_client: Optional[StdioMCPClient]
mcp_client: Optional[object]
debug: bool = False

def log_message(self, format: str, *args: Any) -> None:
Expand Down Expand Up @@ -383,9 +383,14 @@ def _build_agent(
mcp_args: Optional[list[str]],
allow_core_fallback: bool,
mcp_cwd: Optional[str],
) -> tuple[StudyAgent, Optional[StdioMCPClient]]:
mcp_url: Optional[str],
mcp_token: Optional[str],
mcp_timeout: int,
) -> tuple[StudyAgent, Optional[object]]:
mcp_client = None
if mcp_command:
if mcp_url:
mcp_client = HttpMCPClient(HttpMCPClientConfig(url=mcp_url, token=mcp_token, timeout=mcp_timeout))
elif mcp_command:
mcp_client = StdioMCPClient(
StdioMCPClientConfig(command=mcp_command, args=mcp_args or [], cwd=mcp_cwd),
)
Expand Down Expand Up @@ -457,8 +462,15 @@ def main(host: str = "127.0.0.1", port: int = 8765) -> None:
debug = os.getenv("STUDY_AGENT_DEBUG", "0") == "1"
threaded = os.getenv("STUDY_AGENT_THREADING", "1") == "1"
mcp_cwd = os.getenv("STUDY_AGENT_MCP_CWD") or os.getcwd()

if mcp_command:
mcp_url = os.getenv("STUDY_AGENT_MCP_URL")
mcp_token = os.getenv("STUDY_AGENT_MCP_TOKEN")
mcp_timeout = int(os.getenv("STUDY_AGENT_MCP_TIMEOUT", "30"))

if mcp_url:
if "://" in mcp_url and ":" not in mcp_url.split("://", 1)[1]:
raise RuntimeError("STUDY_AGENT_MCP_URL missing port (e.g., http://127.0.0.1:8790/mcp).")
print(f"ACP INFO > MCP url={mcp_url}")
elif mcp_command:
if os.getenv("PHENOTYPE_INDEX_DIR") is None:
print("ACP WARN > PHENOTYPE_INDEX_DIR not set; MCP will use its default.")
if os.getenv("EMBED_URL") is None:
Expand All @@ -468,7 +480,15 @@ def main(host: str = "127.0.0.1", port: int = 8765) -> None:
print(f"ACP INFO > MCP cwd={mcp_cwd}")

args_list = [arg for arg in mcp_args.split(" ") if arg]
agent, mcp_client = _build_agent(mcp_command, args_list, allow_core_fallback, mcp_cwd)
agent, mcp_client = _build_agent(
mcp_command,
args_list,
allow_core_fallback,
mcp_cwd,
mcp_url,
mcp_token,
mcp_timeout,
)

class Handler(ACPRequestHandler):
agent = None
Expand Down Expand Up @@ -504,7 +524,7 @@ def _shutdown(signum, frame) -> None:
_serve(server, mcp_client)


def _serve(server: HTTPServer, mcp_client: Optional[StdioMCPClient]) -> None:
def _serve(server: HTTPServer, mcp_client: Optional[object]) -> None:
try:
server.serve_forever()
finally:
Expand Down
3 changes: 3 additions & 0 deletions docs/PHENOTYPE_RECOMMENDATION_DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ Candidate selection:
19. `STUDY_AGENT_PORT` (default `8765`)
20. `STUDY_AGENT_MCP_CWD` (optional) working directory passed to MCP subprocesses. Use for stable relative paths.
21. `MCP_LOG_LEVEL` (default `INFO`) controls MCP stderr logging (`DEBUG|INFO|WARN|ERROR|OFF`).
22. `STUDY_AGENT_MCP_URL` (optional) HTTP MCP endpoint. When set, ACP uses HTTP and ignores `STUDY_AGENT_MCP_COMMAND`.
23. `STUDY_AGENT_MCP_TOKEN` (optional) bearer token passed to MCP over HTTP.
24. `STUDY_AGENT_MCP_TIMEOUT` (default `30`) HTTP MCP request timeout in seconds.

**Risks and Mitigations**
1. Missing dependencies for FAISS
Expand Down
Loading