diff --git a/examples/blackbox_recipes/mini_swe_agent/Dockerfile.mini-swe-agent-tool b/examples/blackbox_recipes/mini_swe_agent/Dockerfile.mini-swe-agent-tool new file mode 100644 index 00000000..a2fba565 --- /dev/null +++ b/examples/blackbox_recipes/mini_swe_agent/Dockerfile.mini-swe-agent-tool @@ -0,0 +1,45 @@ +# Mini-swe-agent sidecar tool image. +# +# Contains a self-contained Python venv at /opt/mini-swe-agent with +# mini-swe-agent + litellm installed. When mounted into a sandbox at +# /opt/mini-swe-agent, the agent can be invoked via: +# +# /opt/mini-swe-agent/bin/python /opt/mini-swe-agent/bin/run_agent.py ... +# +# Uses python-build-standalone for maximum portability across different +# glibc versions (built against older glibc, forward-compatible). +# +# Build: +# docker build -f Dockerfile.mini-swe-agent-tool -t mini-swe-agent-tool:latest . +# + +FROM debian:bullseye-slim AS builder + +ARG PBS_RELEASE="20260602" +ARG PBS_PYTHON="3.12.13" +ARG PIP_INDEX_URL="" + +# Download and extract python-build-standalone (stripped, 32MB) +RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates wget \ + && rm -rf /var/lib/apt/lists/* \ + && wget -q \ + "https://github.com/astral-sh/python-build-standalone/releases/download/${PBS_RELEASE}/cpython-${PBS_PYTHON}%2B${PBS_RELEASE}-x86_64-unknown-linux-gnu-install_only_stripped.tar.gz" \ + -O /tmp/python.tar.gz \ + && mkdir -p /opt/mini-swe-agent \ + && tar -xzf /tmp/python.tar.gz -C /opt/mini-swe-agent --strip-components=1 \ + && rm /tmp/python.tar.gz + +# Install mini-swe-agent + litellm +RUN /opt/mini-swe-agent/bin/pip install --no-cache-dir \ + ${PIP_INDEX_URL:+-i ${PIP_INDEX_URL}} \ + "mini-swe-agent==2.2.8" \ + "litellm==1.81.7" + +# Copy the in-sandbox runner script +COPY run_agent.py /opt/mini-swe-agent/bin/run_agent.py + +# Final scratch image: files are at the image root level so that when +# akernel_sdk.Mount(target="/opt/mini-swe-agent") overlays this image, +# the files appear at /opt/mini-swe-agent/bin/python etc. +FROM scratch +COPY --from=builder /opt/mini-swe-agent / diff --git a/examples/blackbox_recipes/mini_swe_agent/README.md b/examples/blackbox_recipes/mini_swe_agent/README.md new file mode 100644 index 00000000..436d3c65 --- /dev/null +++ b/examples/blackbox_recipes/mini_swe_agent/README.md @@ -0,0 +1,119 @@ +# Mini-SWE-Agent In-Sandbox Execution + +## Overview + +`mini-swe-agent` runs inside the SWE-bench sandbox through a sidecar tool image. +The external runner creates the sandbox, mounts the tool image at +`/opt/mini-swe-agent`, starts the agent process, and evaluates the reward in the +same sandbox. + +The agent executes commands through `LocalEnvironment` (local bash) inside the +sandbox and calls the LLM through the gateway URL passed in via stdin. The +`mini_swe` tool image uses +[python-build-standalone](https://github.com/astral-sh/python-build-standalone) +to build an isolated Python environment, then copies the result into a minimal +`FROM scratch` final stage, so the sandbox base image does not need to provide +Python for the sidecar tool runtime. + +**This recipe is self-contained.** It shares only +[`../sandbox_client.py`](../sandbox_client.py) with the claude-code recipe; +everything else (`dataset.py`, `reward.py`, `run_agent.py`, `build_tool.sh`, +`run_train.sh`, config) lives in this directory and does not depend on +`claude_code/`. + +**Supported runners:** + +| runner | Description | +|--------|-------------| +| `mini_swe` | mini-swe-agent sidecar runner | + +**Supported sandbox types:** + +| Type | Description | +|------|-------------| +| openyuanrong | Uses `akernel_sdk.Mount` and `sandbox.commands.run()` | + +## Architecture + +```text +[Rollouter Host: mini_swe_agent_runner] + | + |-- SandboxClient.create(image, sidecar_image, sidecar_target="/opt/mini-swe-agent") + | `-- akernel: Sandbox(mounts=[Mount(target="/opt/mini-swe-agent", ...)]) + | + |-- sandbox.run("") + | `-- [Inside Sandbox] + | /opt/mini-swe-agent/bin/python /opt/mini-swe-agent/bin/run_agent.py + | stdin <- task config JSON (task, gateway_url, agent) + | commands run inside the SWE-bench sandbox + | stdout -> agent execution result JSON + | + |-- parse agent result + |-- SandboxEnvForReward(sandbox) -> evaluate_in_env() + `-- POST session.reward_info_url +``` + +## Prerequisites + +1. **AKernel** — set `AKERNEL_SERVER_ADDRESS` and `AKERNEL_TOKEN`. +2. **Tool image** — build the mini-swe-agent tool image and push it to a remote + registry if the sandbox service cannot access local Docker images. + +## 1. Build Tool Image + +`mini_swe` is injected into the SWE-bench sandbox as a sidecar tool image. Use +`build_tool.sh` to build it. + +| Default tool image | Dockerfile | Sandbox mount path | Image contents | +|--------------------|------------|--------------------|----------------| +| `mini-swe-agent-tool:latest` | `Dockerfile.mini-swe-agent-tool` | `/opt/mini-swe-agent` | Standalone Python 3.12, `mini-swe-agent`, `litellm`, and `run_agent.py` | + +```bash +# Use the default PyPI source. +bash examples/blackbox_recipes/mini_swe_agent/build_tool.sh + +# Use a custom PyPI mirror. +bash examples/blackbox_recipes/mini_swe_agent/build_tool.sh --pip-index https://pypi.tuna.tsinghua.edu.cn/simple/ + +# Build and push to a remote registry. +bash examples/blackbox_recipes/mini_swe_agent/build_tool.sh --registry swr.cn-east-3.myhuaweicloud.com/openyuanrong +``` + +The `mini_swe` Python runtime is fully isolated from the sandbox container's +Python. + +### Build Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `TOOL_IMAGE` | `mini-swe-agent-tool` | Image name | +| `TOOL_TAG` | `latest` | Image tag | +| `PIP_INDEX_URL` | unset, use PyPI | pip index URL (`--pip-index`) | + +After pushing, point training at it with `SWE_AGENT_TOOL_IMAGE`. + +## 2. Training (Fully Async) + +```bash +AKERNEL_SERVER_ADDRESS="6.2.179.37:8888" \ +AKERNEL_TOKEN="" \ +SWE_AGENT_TOOL_IMAGE=swr.cn-east-3.myhuaweicloud.com/openyuanrong/mini-swe-agent-tool:latest \ +MODEL_PATH=~/models/Qwen3.5-9B \ +bash examples/blackbox_recipes/mini_swe_agent/run_train.sh +``` + +The training YAML keeps `mini_swe` as the only runner: + +```yaml +agent_runner_fqn: examples.blackbox_recipes.mini_swe_agent.mini_swe_agent_runner.mini_swe_agent_runner +``` + +## 3. Configuration + +| Variable | Default | Description | +|----------|---------|-------------| +| `AGENT_MAX_TURNS` | `100` | mini-swe-agent `step_limit` (the agent's turn budget); read by the runner from the `AGENT_MAX_TURNS` env var | +| `SWE_AGENT_EVAL_TIMEOUT` | `600` | Reward evaluation timeout (seconds) | +| `SWE_AGENT_RUN_TIMEOUT` | `7200` | Max wall time for the agent process in the sandbox | +| `SWE_AGENT_TOOL_IMAGE` | `swr.cn-east-3.myhuaweicloud.com/openyuanrong/mini-swe-agent-tool:latest` | Sidecar tool image | +| `CONDA_ENV` | `testbed` | Conda env activated inside the sandbox before running the agent | diff --git a/examples/blackbox_recipes/mini_swe_agent/__init__.py b/examples/blackbox_recipes/mini_swe_agent/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/blackbox_recipes/mini_swe_agent/build_tool.sh b/examples/blackbox_recipes/mini_swe_agent/build_tool.sh new file mode 100755 index 00000000..7bcdbe1f --- /dev/null +++ b/examples/blackbox_recipes/mini_swe_agent/build_tool.sh @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +# Build the mini-swe-agent sidecar tool image. +# +# The image uses python-build-standalone to build an isolated Python runtime +# with mini-swe-agent + litellm + run_agent.py, copied into a minimal +# `FROM scratch` final stage rooted at /opt/mini-swe-agent. It is mounted into +# the SWE-bench sandbox at /opt/mini-swe-agent, so the sandbox base image does +# not need Python for the sidecar tool runtime. +# +# Usage: +# bash examples/blackbox_recipes/mini_swe_agent/build_tool.sh +# bash examples/blackbox_recipes/mini_swe_agent/build_tool.sh --pip-index https://pypi.tuna.tsinghua.edu.cn/simple/ +# bash examples/blackbox_recipes/mini_swe_agent/build_tool.sh --registry swr.cn-east-3.myhuaweicloud.com/openyuanrong +# +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +IMAGE_NAME="${TOOL_IMAGE:-mini-swe-agent-tool}" +IMAGE_TAG="${TOOL_TAG:-latest}" + +# Parse args +REGISTRY="" +PIP_INDEX_URL="${PIP_INDEX_URL:-}" +while [[ $# -gt 0 ]]; do + case "$1" in + --registry) REGISTRY="$2"; shift 2 ;; + --pip-index) PIP_INDEX_URL="$2"; shift 2 ;; + *) echo "Unknown arg: $1"; exit 1 ;; + esac +done + +BUILD_ARGS=() +if [[ -n "${PIP_INDEX_URL}" ]]; then + BUILD_ARGS+=(--build-arg PIP_INDEX_URL="${PIP_INDEX_URL}") +fi + +echo "==> Building mini_swe tool image: ${IMAGE_NAME}:${IMAGE_TAG}" +docker build \ + -f "${SCRIPT_DIR}/Dockerfile.mini-swe-agent-tool" \ + -t "${IMAGE_NAME}:${IMAGE_TAG}" \ + "${BUILD_ARGS[@]}" \ + "${SCRIPT_DIR}/" + +if [[ -n "${REGISTRY}" ]]; then + FULL_TAG="${REGISTRY}/${IMAGE_NAME}:${IMAGE_TAG}" + echo "==> Tagging and pushing: ${FULL_TAG}" + docker tag "${IMAGE_NAME}:${IMAGE_TAG}" "${FULL_TAG}" + docker push "${FULL_TAG}" + echo " Pushed." +fi + +echo "" +echo "Tool image ready: ${IMAGE_NAME}:${IMAGE_TAG}" +if [[ -n "${REGISTRY}" ]]; then + echo " Remote sandbox: ${FULL_TAG}" +fi diff --git a/examples/blackbox_recipes/mini_swe_agent/config/swe_agent_blackbox_megatron_v1.yaml b/examples/blackbox_recipes/mini_swe_agent/config/swe_agent_blackbox_megatron_v1.yaml new file mode 100644 index 00000000..ad2c719d --- /dev/null +++ b/examples/blackbox_recipes/mini_swe_agent/config/swe_agent_blackbox_megatron_v1.yaml @@ -0,0 +1,168 @@ +# Megatron + V1 unified trainer config for the blackbox mini-swe recipe. +# +# Entry point: python3 -m verl.trainer.main_ppo +# Default trainer mode is separate_async. On a single 8-GPU node this recipe +# uses 4 GPUs for trainer and 4 GPUs for standalone rollout. + +hydra: + searchpath: + - pkg://verl.trainer.config + +defaults: + - ppo_megatron_trainer + - _self_ + +actor_rollout_ref: + hybrid_engine: true + nccl_timeout: 9600 + + model: + path: ??? + + rollout: + name: vllm + mode: async + nnodes: 1 + n_gpus_per_node: 4 + prompt_length: 4096 + response_length: 131072 + max_model_len: 135168 + temperature: 1.0 + top_p: 1.0 + top_k: -1 + n: 8 + tensor_model_parallel_size: 4 + gpu_memory_utilization: 0.7 + calculate_log_probs: true + enable_sleep_mode: true + free_cache_engine: true + enable_chunked_prefill: true + max_num_batched_tokens: 135168 + checkpoint_engine: + backend: nccl + update_weights_bucket_megabytes: 2048 + + multi_turn: + enable: true + max_parallel_calls: 1 + format: qwen3_coder + + agent: + num_workers: 8 + agent_loop_manager_class: uni_agent.framework.entry.AgentFrameworkRolloutAdapter + + custom: + agent_framework: + gateway_count: 1 + agent_runners: + swe_agent: + runner_fqn: examples.blackbox_recipes.mini_swe_agent.mini_swe_agent_runner.mini_swe_agent_runner + dispatch_mode: ray_task + max_concurrent_sessions: 32 + runner_kwargs: + tool_image: swr.cn-east-3.myhuaweicloud.com/openyuanrong/mini-swe-agent-tool:latest + run_timeout: 3600 + conda_env: testbed + + actor: + use_dynamic_bsz: true + use_rollout_log_probs: true + ppo_mini_batch_size: 16 + ppo_micro_batch_size_per_gpu: 1 + use_kl_loss: false + kl_loss_coef: 0.0 + clip_ratio_low: 0.2 + clip_ratio_high: 0.28 + clip_ratio_c: 10.0 + loss_agg_mode: token-mean + entropy_coeff: 0 + optim: + lr: 1e-6 + weight_decay: 0.1 + lr_decay_style: constant + megatron: + param_offload: true + grad_offload: true + optimizer_offload: true + tensor_model_parallel_size: 4 + pipeline_model_parallel_size: 1 + context_parallel_size: 1 + use_mbridge: true + use_remove_padding: false + + ref: + log_prob_micro_batch_size_per_gpu: 1 + megatron: + param_offload: false + tensor_model_parallel_size: 4 + pipeline_model_parallel_size: 1 + context_parallel_size: 1 + +data: + train_files: ??? + val_files: ??? + prompt_key: prompt + truncation: left + max_prompt_length: 4096 + max_response_length: 131072 + train_batch_size: 1 + val_batch_size: 1 + gen_batch_size: 1 + return_raw_chat: true + trust_remote_code: true + dataloader_num_workers: 0 + custom_cls: + path: pkg://examples.blackbox_recipes.mini_swe_agent.dataset + name: SWEBenchDataset + +algorithm: + gamma: 1.0 + lam: 1.0 + adv_estimator: grpo + use_kl_in_reward: false + kl_ctrl: + type: fixed + kl_coef: 0.0 + rollout_correction: + bypass_mode: true + +reward: + custom_reward_function: + path: pkg://examples.blackbox_recipes.mini_swe_agent.reward + name: compute_score + +trainer: + nnodes: 1 + n_gpus_per_node: 4 + total_epochs: 10 + total_training_steps: null + project_name: swe_agent_blackbox + experiment_name: swe_agent + logger: + - console + device: cuda + val_before_train: true + val_only: false + save_freq: 10 + test_freq: 10 + default_local_dir: checkpoints/swe_agent_blackbox + resume_mode: auto + use_v1: true + v1: + trainer_mode: separate_async + colocate_async: + num_warmup_batches: 1 + separate_async: + num_warmup_batches: 4 + parameter_sync_step: 4 + +transfer_queue: + enable: true + +ray_kwargs: + ray_init: + runtime_env: + env_vars: + TRANSFER_QUEUE_ENABLE: "" + NCCL_P2P_DISABLE: "1" + NCCL_SHM_DISABLE: "1" diff --git a/examples/blackbox_recipes/mini_swe_agent/dataset.py b/examples/blackbox_recipes/mini_swe_agent/dataset.py new file mode 100644 index 00000000..e7781c03 --- /dev/null +++ b/examples/blackbox_recipes/mini_swe_agent/dataset.py @@ -0,0 +1,33 @@ +"""SWEBench-specific dataset that injects verl-standard reward fields.""" + +from verl.utils.dataset.rl_dataset import RLHFDataset + + +def extract_image(env_config: dict) -> str: + """Extract Docker image from env config, supporting both flat and nested formats. + + Flat: env_config["image"] + Nested: env_config["deployment"]["image"] + """ + image = env_config.get("image") + if image: + return image + deployment = env_config.get("deployment") + if isinstance(deployment, dict): + image = deployment.get("image") + if image: + return image + return "" + + +class SWEBenchDataset(RLHFDataset): + def __getitem__(self, item): + row_dict = super().__getitem__(item) + extra_info = row_dict.get("extra_info", {}) + tools_kwargs = extra_info.get("tools_kwargs", {}) + reward_config = tools_kwargs.get("reward", {}) + + row_dict.setdefault("data_source", reward_config.get("name", "unknown")) + row_dict.setdefault("reward_model", {"ground_truth": {}}) + + return row_dict diff --git a/examples/blackbox_recipes/mini_swe_agent/mini_swe_agent_runner.py b/examples/blackbox_recipes/mini_swe_agent/mini_swe_agent_runner.py new file mode 100644 index 00000000..2b16099a --- /dev/null +++ b/examples/blackbox_recipes/mini_swe_agent/mini_swe_agent_runner.py @@ -0,0 +1,248 @@ +"""Mini-swe-agent runner for the blackbox SWE-agent recipe. + +Agent runs inside a remote sandbox via sidecar tool image mount. +The runner creates the sandbox, pipes task config via stdin, parses +the result from stdout, and evaluates reward in the same sandbox. +""" + +from __future__ import annotations + +import base64 +import json +import logging +import os +import shlex +import time +from pathlib import Path + +import httpx + +from examples.blackbox_recipes.mini_swe_agent.dataset import extract_image +from examples.blackbox_recipes.mini_swe_agent.reward import build_reward_context, evaluate_in_env +from examples.blackbox_recipes.sandbox_client import ( + SandboxClient, + extract_upstream, + rewrite_gateway_url, +) +from uni_agent.gateway.session import SessionHandle + +logger = logging.getLogger(__name__) + +DEFAULT_TOOL_IMAGE = "swr.cn-east-3.myhuaweicloud.com/openyuanrong/mini-swe-agent-tool:latest" + + +class SandboxEnvForReward: + """Adapts :class:`Sandbox` to the async env interface used by + reward specs (``communicate``, ``write_file``, ``read_file``). + """ + + def __init__(self, sandbox): + self._sandbox = sandbox + + async def communicate(self, input: str, timeout=600, check="ignore", error_msg="Command failed") -> str: + result = await self._sandbox.run(input, timeout=int(timeout)) + if check == "raise" and result.exit_code != 0: + raise RuntimeError(f"{error_msg}: {result.stdout[:200]}") + return result.stdout + + async def write_file(self, path: str | Path, content: str) -> None: + encoded = base64.b64encode(content.encode()).decode() + await self.communicate(f"echo {encoded} | base64 -d > {path}", check="raise", error_msg=f"write {path}") + + async def read_file(self, path: str | Path, **_) -> str: + return await self.communicate(f"cat {path}") + + +def _extract_task(raw_prompt) -> str: + """Extract task text from raw_prompt (str or message list).""" + if isinstance(raw_prompt, str): + return raw_prompt + return next( + (m["content"] for m in raw_prompt if isinstance(m, dict) and m.get("role") == "user"), + str(raw_prompt), + ) + + +def _build_task_config( + *, + task: str, + gateway_url: str, +) -> dict: + """Build the task config passed to run_agent.py via stdin.""" + agent_gateway_url = rewrite_gateway_url(gateway_url) + step_limit = int(os.environ.get("AGENT_MAX_TURNS", "100")) + return { + "task": task, + "gateway_url": agent_gateway_url, + "agent": { + "step_limit": step_limit, + }, + } + + +def build_agent_command( + *, + config_b64: str, + conda_env: str = "testbed", +) -> str: + """Build the command that runs run_agent.py inside the sandbox.""" + conda_prefix = f"/opt/miniconda3/envs/{conda_env}" + run_agent_env = ( + f"CONDA_DEFAULT_ENV={shlex.quote(conda_env)} " + f"CONDA_PREFIX={shlex.quote(conda_prefix)} " + f"PATH={shlex.quote(conda_prefix + '/bin')}:/opt/miniconda3/bin:$PATH " + "PIP_DISABLE_PIP_VERSION_CHECK=1 " + "PIP_PROGRESS_BAR=off" + ) + return ( + "unset HTTP_PROXY HTTPS_PROXY http_proxy https_proxy NO_PROXY no_proxy; " + f"env {run_agent_env} sh -c 'echo \"[mini_swe] shell env: CONDA_DEFAULT_ENV=$CONDA_DEFAULT_ENV " + 'CONDA_PREFIX=$CONDA_PREFIX PATH=$PATH" >&2; ' + 'echo "[mini_swe] python=$(command -v python) pip=$(command -v pip)" >&2\' ; ' + f"printf %s {shlex.quote(config_b64)} | base64 -d | " + f"env {run_agent_env} " + "/opt/mini-swe-agent/bin/python /opt/mini-swe-agent/bin/run_agent.py" + ) + + +async def mini_swe_agent_runner( + *, + raw_prompt, + session: SessionHandle, + sample_index: int, + tools_kwargs: dict | None = None, + tool_image: str = DEFAULT_TOOL_IMAGE, + run_timeout: int = 7200, + conda_env: str = "testbed", + sandbox_max_retries: int = 10, + **kwargs, +) -> None: + """Run mini-swe-agent inside a sandbox with sidecar tool mount. + + Flow: + 1. Create remote sandbox with mini-swe-agent sidecar + 2. Pipe task config to run_agent.py via stdin + 3. Parse agent result from stdout + 4. Evaluate reward in the same sandbox + 5. Post reward_info for the framework reward path + """ + tools_kwargs = tools_kwargs or {} + logger.info("mini_swe_agent_runner called, sample_index=%d", sample_index) + + # Extract task text and sandbox config (image from parquet) + task = _extract_task(raw_prompt) + logger.info("task extracted, %d chars", len(task)) + + env_config = tools_kwargs.get("env", {}) + image = extract_image(env_config) + if not image: + raise ValueError(f"No sandbox image found in tools_kwargs.env for sample {sample_index}") + + # Gateway URL — extract upstream for tunnel + gateway_url = session.base_url + if not gateway_url: + raise ValueError(f"gateway_url is empty for sample {sample_index}") + + upstream = extract_upstream(gateway_url) + sandbox = await SandboxClient.create( + image=image, + sidecar_image=tool_image, + upstream=upstream, + max_retries=int(sandbox_max_retries), + ) + sandbox_id = sandbox.sandbox_id + logger.info("Sandbox created (image=%s, sandbox_id=%s)", image, sandbox_id) + + # Build task config (gateway URL rewritten to sandbox-internal tunnel) + task_config = _build_task_config( + task=task, + gateway_url=gateway_url, + ) + + try: + # Run post_setup_cmd if provided (e.g. git checkout correct commit) + post_setup_cmd = env_config.get("post_setup_cmd", "") + if post_setup_cmd: + logger.info("Running post_setup_cmd (%d chars)...", len(post_setup_cmd)) + r = await sandbox.run(post_setup_cmd, timeout=600) + if r.exit_code != 0: + logger.warning("post_setup_cmd failed (rc=%d): %s", r.exit_code, r.stdout[:200]) + else: + logger.info("post_setup_cmd done") + + # Run agent inside sandbox — pipe config via base64-encoded stdin. + config_b64 = base64.b64encode(json.dumps(task_config).encode()).decode() + agent_cmd = build_agent_command(config_b64=config_b64, conda_env=conda_env) + logger.debug("[sample %d] starting agent inside sandbox", sample_index) + t0 = time.perf_counter() + agent_result = await sandbox.run(agent_cmd, timeout=int(run_timeout)) + elapsed = time.perf_counter() - t0 + logger.debug( + "[sample %d] agent process finished: rc=%d (%.1fs)", + sample_index, + agent_result.exit_code, + elapsed, + ) + + # Parse agent result from stdout + agent_info = _parse_agent_result(agent_result.stdout, sample_index) + logger.info( + "[sample %d] agent: exit_status=%s, submission=%d chars", + sample_index, + agent_info.get("exit_status"), + len(agent_info.get("submission", "")), + ) + + # Evaluate reward in the same sandbox + metadata, eval_timeout = build_reward_context(tools_kwargs) + t0 = time.perf_counter() + reward_env = SandboxEnvForReward(sandbox) + score, eval_result = await evaluate_in_env(reward_env, metadata, eval_timeout) + logger.debug( + "[sample %d] reward done: score=%s, resolved=%s (%.1fs)", + sample_index, + score, + eval_result.get("resolved"), + time.perf_counter() - t0, + ) + + reward_info = {"reward_score": score, **eval_result} + if not session.reward_info_url: + raise ValueError(f"reward_info_url is empty for session {session.session_id}") + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post(session.reward_info_url, json={"reward_info": reward_info}) + response.raise_for_status() + + except Exception as e: + logger.warning("Mini-swe-agent runner failed for sample %d (sandbox_id=%s): %s", sample_index, sandbox_id, e) + raise + finally: + try: + await sandbox.cleanup() + except Exception: + pass + + +def _parse_agent_result(stdout: str, sample_index: int) -> dict: + """Parse agent result JSON from run_agent.py stdout. + + litellm may print error messages to stdout, polluting the output. + The last line starting with '{' is the result JSON. + """ + stdout = stdout.strip() + if not stdout: + return {"exit_status": "error", "submission": ""} + # Try the last line that looks like JSON first + lines = [ln.strip() for ln in stdout.split("\n") if ln.strip()] + for line in reversed(lines): + if line.startswith("{"): + try: + return json.loads(line) + except json.JSONDecodeError: + continue + # Fallback: try entire stdout + try: + return json.loads(stdout) + except json.JSONDecodeError: + logger.warning("[sample %d] Failed to parse agent result (full stdout): %s", sample_index, stdout[:1000]) + return {"exit_status": "error", "submission": ""} diff --git a/examples/blackbox_recipes/mini_swe_agent/parallel_infer.py b/examples/blackbox_recipes/mini_swe_agent/parallel_infer.py new file mode 100644 index 00000000..cd792cd5 --- /dev/null +++ b/examples/blackbox_recipes/mini_swe_agent/parallel_infer.py @@ -0,0 +1,379 @@ +"""Standalone inference runner for the blackbox mini-swe-agent recipe. + +Spins up vLLM + gateway + a reward worker, runs agent sessions in parallel, +and reports resolve rate. Does NOT start the Megatron trainer. + +Reuses the recipe's existing training config +(config/swe_agent_blackbox_megatron_v1.yaml); its megatron/optimizer sections +are inert here since this driver never builds the actor worker group — only +the rollout, agent_framework, model, and reward sections are read. + +Usage: + python examples/blackbox_recipes/mini_swe_agent/parallel_infer.py \ + --model-path ~/models/Qwen3.5-9B \ + --data-path ~/data/swe_agent/swe_bench_verified.parquet \ + --max-samples 10 +""" + +from __future__ import annotations + +import argparse +import asyncio +import logging +import os +from typing import Any +from uuid import uuid4 + +import numpy as np +import ray + +from verl.experimental.reward_loop.reward_loop import RewardLoopWorker +from verl.utils import tensordict_utils as tu +from verl.utils.transferqueue_utils import tq +from verl.workers.rollout.llm_server import LLMServerManager + +from uni_agent.framework.entry import build_agent_framework, build_gateway_manager + +logging.basicConfig( + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + level=os.getenv("VERL_LOGGING_LEVEL", "INFO"), + force=True, +) +logger = logging.getLogger(__name__) + +# ── Recipe-specific constants (only these two differ between recipes) ────── +_CONFIG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config") +_CONFIG_NAME = "swe_agent_blackbox_megatron_v1" +_DEFAULT_TOOL_IMAGE = "swr.cn-east-3.myhuaweicloud.com/openyuanrong/mini-swe-agent-tool:latest" + + +# ===================================================================== +# Dataset loading (inlined; keeps the driver self-contained) +# ===================================================================== + + +def _remap_image_to_local(image_name: str) -> str: + parts = image_name.split("/") + if len(parts) > 1 and "." in parts[0]: + basename = parts[-1] + else: + basename = image_name + basename = basename.replace("_1776_", "__") + if ":" in basename: + basename = basename.rsplit(":", 1)[0] + return f"{basename}:latest" + + +def _remap_sample_images(sample: dict[str, Any]) -> dict[str, Any]: + extra_info = sample.get("extra_info") + if not extra_info: + return sample + tools_kwargs = extra_info.get("tools_kwargs", {}) + env = tools_kwargs.get("env", {}) + image = env.get("image") + if not image: + return sample + local_image = _remap_image_to_local(image) + if local_image != image: + logger.debug("Remapping image: %s -> %s", image, local_image) + env["image"] = local_image + return sample + + +def _inject_reward_fields(sample: dict[str, Any]) -> None: + extra_info = sample.get("extra_info", {}) + tools_kwargs = extra_info.get("tools_kwargs", {}) + reward_config = tools_kwargs.get("reward", {}) + sample.setdefault("data_source", reward_config.get("name", "unknown")) + sample.setdefault("reward_model", {"ground_truth": {}}) + + +def load_swe_dataset(data_path: str, max_samples: int = -1) -> list[dict[str, Any]]: + import pyarrow.parquet as pq + + path = os.path.expanduser(data_path) + logger.info("Loading dataset from: %s", path) + samples = pq.read_table(path).to_pylist() + for i, sample in enumerate(samples): + samples[i] = _remap_sample_images(sample) + _inject_reward_fields(samples[i]) + if max_samples > 0: + samples = samples[:max_samples] + logger.info("Loaded %d samples", len(samples)) + return samples + + +# ===================================================================== +# Config +# ===================================================================== + + +def _load_config( + *, + model_path: str, + engine: str, + prompt_length: int, + response_length: int, + temperature: float, + top_p: float, + n: int, + nnodes: int, + n_gpus_per_node: int, + tensor_parallel_size: int, + gateway_count: int, + max_concurrent_sessions: int, + tool_image: str | None, + run_timeout: int, +) -> Any: + """Compose the recipe's training config and override inference fields. + + The megatron/actor/optimizer sections are left untouched and never read. + """ + from hydra import compose, initialize_config_dir + from omegaconf import OmegaConf + + with initialize_config_dir(config_dir=_CONFIG_DIR, version_base=None): + config = compose(config_name=_CONFIG_NAME) + + OmegaConf.set_struct(config, False) + + config.actor_rollout_ref.model.path = os.path.expanduser(model_path) + + ro = config.actor_rollout_ref.rollout + ro.name = engine + ro.mode = "async" + ro.prompt_length = prompt_length + ro.response_length = response_length + ro.max_model_len = prompt_length + response_length + 1024 + ro.max_num_batched_tokens = ro.max_model_len + ro.n = n + ro.temperature = temperature + ro.top_p = top_p + ro.tensor_model_parallel_size = tensor_parallel_size + ro.gpu_memory_utilization = float(os.getenv("ROLLOUT_GPU_MEM_UTIL", "0.7")) + ro.nnodes = nnodes + ro.n_gpus_per_node = n_gpus_per_node + ro.calculate_log_probs = True + ro.enable_sleep_mode = False + + af = ro.custom.agent_framework + af.gateway_count = gateway_count + runner_name = next(iter(af.agent_runners.keys())) + runner_cfg = af.agent_runners[runner_name] + runner_cfg.max_concurrent_sessions = max_concurrent_sessions + if tool_image: + runner_cfg.runner_kwargs.tool_image = tool_image + runner_cfg.runner_kwargs.run_timeout = run_timeout + + config.trainer.nnodes = nnodes + config.trainer.n_gpus_per_node = n_gpus_per_node + + OmegaConf.set_struct(config, True) + return config + + +# ===================================================================== +# Batch + score capture +# ===================================================================== + + +def _build_prompts(samples: list[dict[str, Any]]) -> tuple[Any, list[str]]: + raw_prompts = [sample["prompt"] for sample in samples] + uids = [str(uuid4()) for _ in samples] + tools_kwargs_list = [dict((sample.get("extra_info") or {}).get("tools_kwargs", {})) for sample in samples] + prompts = tu.get_tensordict( + tensor_dict={ + "raw_prompt": raw_prompts, + "uid": uids, + "data_source": [sample["data_source"] for sample in samples], + "reward_model": [sample["reward_model"] for sample in samples], + "tools_kwargs": tools_kwargs_list, + }, + non_tensor_dict={"global_steps": 0}, + ) + return prompts, uids + + +def _install_tq_capture() -> tuple[dict[str, float], dict[str, str]]: + """Monkeypatch the process-local TransferQueue to capture rm_scores in-memory. + + Runner dispatch is a Ray task, but session finalize/score/TQ-writes happen + in this driver process, so patching ``tq`` here captures every write. + """ + captured_scores: dict[str, float] = {} + uid_status: dict[str, str] = {} + + async def _fake_put(*, key, partition_id=None, tag=None, **kwargs): + if isinstance(tag, dict) and "status" in tag: + uid_status[str(key)] = str(tag["status"]) + + async def _fake_batch_put(*, keys=None, fields=None, tags=None, partition_id=None, **kwargs): + if fields is None or keys is None or "rm_scores" not in fields: + return + rm = fields["rm_scores"] # nested tensor; rm[i] is trajectory i's response scores + for i, key in enumerate(keys): + row = rm[i] + captured_scores[str(key)] = float(row[-1].item()) if row.numel() else 0.0 + + tq.async_kv_put = _fake_put + tq.async_kv_batch_put = _fake_batch_put + return captured_scores, uid_status + + +def _report(samples, uids, captured_scores) -> dict[str, Any]: + uid_to_index = {uid: i for i, uid in enumerate(uids)} + per_sample_sum = [0.0] * len(samples) + per_sample_cnt = [0] * len(samples) + for key, score in captured_scores.items(): + # key format: {uid}_{session_index}_{index} + uid = key.rsplit("_", 2)[0] + idx = uid_to_index.get(uid) + if idx is None: + continue + per_sample_sum[idx] += score + per_sample_cnt[idx] += 1 + per_sample_scores = [ + per_sample_sum[i] / per_sample_cnt[i] if per_sample_cnt[i] else 0.0 for i in range(len(samples)) + ] + resolved = sum(1 for s in per_sample_scores if s > 0) + mean = float(np.mean(per_sample_scores)) if per_sample_scores else 0.0 + logger.info( + "Resolved %d / %d samples (%.2f%%), mean score: %.4f", + resolved, len(samples), 100.0 * resolved / max(len(samples), 1), mean, + ) + return {"resolved": resolved, "total": len(samples), "mean_score": mean, "per_sample_scores": per_sample_scores} + + +# ===================================================================== +# Runner +# ===================================================================== + + +def run_inference( + *, + model_path: str, + data_path: str, + prompt_length: int, + response_length: int, + temperature: float, + top_p: float, + n: int, + max_samples: int, + engine: str, + nnodes: int, + n_gpus_per_node: int, + tensor_parallel_size: int, + gateway_count: int, + max_concurrent_sessions: int, + tool_image: str | None, + run_timeout: int, +) -> dict[str, Any]: + if not ray.is_initialized(): + ray.init() + + config = _load_config( + model_path=model_path, + engine=engine, + prompt_length=prompt_length, + response_length=response_length, + temperature=temperature, + top_p=top_p, + n=n, + nnodes=nnodes, + n_gpus_per_node=n_gpus_per_node, + tensor_parallel_size=tensor_parallel_size, + gateway_count=gateway_count, + max_concurrent_sessions=max_concurrent_sessions, + tool_image=tool_image, + run_timeout=run_timeout, + ) + + samples = load_swe_dataset(data_path, max_samples=max_samples) + if not samples: + raise ValueError("No samples to process") + + logger.info("Initializing LLM server manager...") + llm_server_manager = LLMServerManager.create(config=config) + llm_client = llm_server_manager.get_client() + + gateway_manager = build_gateway_manager(config=config, llm_client=llm_client) + reward_worker = ray.remote(RewardLoopWorker).remote(config, None) + framework = build_agent_framework( + config=config, + gateway_manager=gateway_manager, + reward_loop_worker_handles=[reward_worker], + ) + + prompts, uids = _build_prompts(samples) + captured_scores, _uid_status = _install_tq_capture() + + logger.info("Starting %d sample(s), %d session(s) each...", len(samples), n) + try: + asyncio.run(framework.generate_sequences(prompts)) + except RuntimeError as exc: + logger.warning("generate_sequences failed: %s", exc) + + if not captured_scores: + logger.warning( + "No trajectory scores captured — all rollouts may have failed (see the " + "generate_sequences summary above), or the TransferQueue monkeypatch did not " + "reach the writer; resolve rate will be reported as 0." + ) + + result = _report(samples, uids, captured_scores) + + asyncio.run(gateway_manager.shutdown()) + return result + + +# ===================================================================== +# CLI +# ===================================================================== + + +def main(): + parser = argparse.ArgumentParser(description="Blackbox mini-swe-agent standalone inference") + parser.add_argument("--model-path", "--model", type=str, default="~/models/Qwen3.5-9B") + parser.add_argument("--data-path", type=str, default="~/data/swe_agent/swe_bench_verified.parquet") + parser.add_argument("--max-samples", type=int, default=-1) + parser.add_argument("--prompt-length", type=int, default=4096) + parser.add_argument("--response-length", type=int, default=131072) + parser.add_argument("--temperature", type=float, default=1.0) + parser.add_argument("--top-p", type=float, default=1.0) + parser.add_argument("--n", type=int, default=1) + parser.add_argument("--engine", type=str, default="vllm", choices=["vllm", "sglang"]) + parser.add_argument("--tensor-parallel-size", "--tp", type=int, default=4) + parser.add_argument("--nnodes", type=int, default=1) + parser.add_argument("--n-gpus-per-node", type=int, default=8) + parser.add_argument("--gateway-count", type=int, default=1) + parser.add_argument("--max-concurrent-sessions", type=int, default=8) + parser.add_argument("--tool-image", type=str, default=_DEFAULT_TOOL_IMAGE) + parser.add_argument("--run-timeout", type=int, default=7200) + parser.add_argument("--max-turns", type=int, default=100) + args = parser.parse_args() + + # Set before ray.init so runner Ray tasks inherit it. + os.environ["AGENT_MAX_TURNS"] = str(args.max_turns) + + run_inference( + model_path=args.model_path, + data_path=args.data_path, + prompt_length=args.prompt_length, + response_length=args.response_length, + temperature=args.temperature, + top_p=args.top_p, + n=args.n, + max_samples=args.max_samples, + engine=args.engine, + nnodes=args.nnodes, + n_gpus_per_node=args.n_gpus_per_node, + tensor_parallel_size=args.tensor_parallel_size, + gateway_count=args.gateway_count, + max_concurrent_sessions=args.max_concurrent_sessions, + tool_image=args.tool_image, + run_timeout=args.run_timeout, + ) + + +if __name__ == "__main__": + main() diff --git a/examples/blackbox_recipes/mini_swe_agent/reward.py b/examples/blackbox_recipes/mini_swe_agent/reward.py new file mode 100644 index 00000000..267cfea5 --- /dev/null +++ b/examples/blackbox_recipes/mini_swe_agent/reward.py @@ -0,0 +1,74 @@ +"""Reward utilities for the blackbox SWE-agent recipe. + +Contains: +- build_reward_context: extract reward metadata + eval_timeout from tools_kwargs +- compute_score: thin reward function that reads reward_score from extra_info +- evaluate_in_env: run reward evaluation in Docker env (shared by both runners) +""" + +from __future__ import annotations + +import logging +import os +from typing import Any + +logger = logging.getLogger(__name__) + + +def build_reward_context(tools_kwargs: dict) -> tuple[dict[str, Any], int]: + """Extract reward metadata and eval_timeout from per-sample tools_kwargs.""" + reward_config = tools_kwargs.get("reward", {}) + metadata = { + "data_source": reward_config.get("name", "unknown"), + "reward_model": reward_config.get("metadata", {}), + } + eval_timeout = int(os.environ.get("SWE_AGENT_EVAL_TIMEOUT", "600")) + return metadata, eval_timeout + + +def compute_score(data_source: str, solution_str: str, ground_truth: str, extra_info=None) -> dict: + """Read reward_score from extra_info, injected by the agent runner.""" + score = 0.0 + if extra_info and "reward_score" in extra_info: + score = float(extra_info["reward_score"]) + return {"score": score} + + +def _get_reward_spec(data_source: str): + """Load reward spec class by data_source name.""" + from uni_agent.reward.registry import REWARD_SPEC_REGISTRY, _load_reward_spec_module + + if data_source not in REWARD_SPEC_REGISTRY: + _load_reward_spec_module(data_source) + cls = REWARD_SPEC_REGISTRY.get(data_source) + if cls is None: + raise ValueError(f"Unknown data_source: {data_source}. Available: {list(REWARD_SPEC_REGISTRY.keys())}") + return cls + + +async def evaluate_in_env( + env, + metadata: dict[str, Any], + eval_timeout: int = 600, +) -> tuple[float, dict]: + """Run reward evaluation in the Docker env. + + Returns (score, eval_result) where score is 1.0/0.0 and + eval_result contains details (eval_completed, resolved, etc.). + """ + data_source = metadata.get("data_source", "unknown") + reward_model = metadata.get("reward_model", {}) + + spec_cls = _get_reward_spec(data_source) + spec_metadata = reward_model.get("ground_truth", reward_model) + + spec = spec_cls( + run_id="swe_bb_eval", + metadata=spec_metadata, + env=env, + eval_timeout=eval_timeout, + ) + + resolved, result = await spec.compute_reward() + score = 1.0 if resolved else 0.0 + return score, result diff --git a/examples/blackbox_recipes/mini_swe_agent/run_agent.py b/examples/blackbox_recipes/mini_swe_agent/run_agent.py new file mode 100644 index 00000000..68406803 --- /dev/null +++ b/examples/blackbox_recipes/mini_swe_agent/run_agent.py @@ -0,0 +1,114 @@ +#!/opt/mini-swe-agent/bin/python +"""Run mini-swe-agent inside the sandbox. + +Input: task config JSON from **stdin** + - task: str — the issue description for the agent to solve + - gateway_url: str — LLM gateway endpoint (tunnel URL for remote sandbox) + - agent: dict — agent config (e.g. step_limit) + +Output: agent result JSON to **stdout**, or error JSON on failure +""" + +from __future__ import annotations + +import json +import sys + +DEFAULT_ACTION_TIMEOUT = 600 + + +def _fail(msg: str, exit_status: str = "error") -> None: + """Write error result to stdout and exit.""" + sys.stdout.write(json.dumps({"exit_status": exit_status, "submission": "", "error": msg})) + sys.stdout.write("\n") + sys.stdout.flush() + + +def main() -> None: + try: + # 1. Read task config from stdin + config = json.load(sys.stdin) + task = config["task"] + gateway_url = config["gateway_url"] + + # 2. Load swebench defaults + from minisweagent.config import builtin_config_dir, get_config_from_spec + + swebench_cfg = get_config_from_spec(str(builtin_config_dir / "benchmarks" / "swebench.yaml")) + + # 3. Create LocalEnvironment (use swebench defaults) + from minisweagent.environments.local import LocalEnvironment + + env_cfg = dict(swebench_cfg.get("environment", {})) + env_cfg.pop("environment_class", None) + env_cfg["timeout"] = DEFAULT_ACTION_TIMEOUT + env_cfg.setdefault("env", {}) + env_cfg["env"].setdefault("GIT_PAGER", "cat") + for key in ( + "image", + "container_timeout", + "run_args", + "executable", + "pull_timeout", + "forward_env", + "interpreter", + ): + env_cfg.pop(key, None) + env = LocalEnvironment(**env_cfg) + + # 4. Create LitellmModel pointing at gateway + from minisweagent.models.litellm_model import LitellmModel + + model_defaults = dict(swebench_cfg.get("model", {})) + model_defaults.pop("model_name", None) + model_defaults.pop("model_kwargs", None) + model_cfg = model_defaults + model_cfg.update( + { + "model_name": "openai/default", + "model_kwargs": { + "api_base": gateway_url, + "api_key": "not-needed", + "drop_params": True, + }, + "cost_tracking": "ignore_errors", + } + ) + model = LitellmModel(**model_cfg) + + # 5. Create DefaultAgent + from minisweagent.agents.default import DefaultAgent + + agent_defaults = dict(swebench_cfg.get("agent", {})) + agent_overrides = config.get("agent", {}) + agent_defaults.update(agent_overrides) + agent_cfg = agent_defaults + step_limit = agent_cfg.get("step_limit", 100) + agent_cfg["step_limit"] = step_limit + agent = DefaultAgent(model, env, **agent_cfg) + + # 6. Run agent + try: + info = agent.run(task=task) + except Exception as e: + info = {"exit_status": type(e).__name__, "submission": str(e)} + + # 7. Write result to stdout + result = { + "exit_status": info.get("exit_status", "unknown"), + "submission": info.get("submission", ""), + "model_stats": { + "instance_cost": agent.cost, + "api_calls": agent.n_calls, + }, + } + sys.stdout.write(json.dumps(result, ensure_ascii=False)) + sys.stdout.write("\n") + sys.stdout.flush() + + except Exception as e: + _fail(str(e), exit_status=type(e).__name__) + + +if __name__ == "__main__": + main() diff --git a/examples/blackbox_recipes/mini_swe_agent/run_infer.sh b/examples/blackbox_recipes/mini_swe_agent/run_infer.sh new file mode 100755 index 00000000..2dfa997d --- /dev/null +++ b/examples/blackbox_recipes/mini_swe_agent/run_infer.sh @@ -0,0 +1,80 @@ +#!/usr/bin/env bash +# Standalone inference for the blackbox mini-swe-agent recipe. +# Runs rollout + reward only (no Megatron trainer) and reports resolve rate. +# +# Usage: +# bash examples/blackbox_recipes/mini_swe_agent/run_infer.sh +# +# All configurable via environment variables (see defaults below). + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="${REPO_ROOT:-$(cd "${SCRIPT_DIR}/../../.." && pwd)}" +cd "${REPO_ROOT}" + +# ── Model & data ───────────────────────────────────────────────────────── +MODEL_PATH="${MODEL_PATH:-${HOME}/models/Qwen3.5-9B}" +DATA_PATH="${DATA_PATH:-${HOME}/data/swe_agent/swe_bench_verified.parquet}" + +# ── Inference parameters ───────────────────────────────────────────────── +MAX_SAMPLES="${MAX_SAMPLES:--1}" +PROMPT_LENGTH="${PROMPT_LENGTH:-4096}" +RESPONSE_LENGTH="${RESPONSE_LENGTH:-131072}" +TEMPERATURE="${TEMPERATURE:-1.0}" +TOP_P="${TOP_P:-1.0}" +N="${N:-1}" +ENGINE="${ENGINE:-vllm}" +TP="${TP:-4}" +NNODES="${NNODES:-1}" +N_GPUS_PER_NODE="${N_GPUS_PER_NODE:-8}" +GATEWAY_COUNT="${GATEWAY_COUNT:-1}" +MAX_CONCURRENT_SESSIONS="${MAX_CONCURRENT_SESSIONS:-8}" + +# ── Agent parameters ───────────────────────────────────────────────────── +AGENT_MAX_TURNS="${AGENT_MAX_TURNS:-100}" +SWE_AGENT_TOOL_IMAGE="${SWE_AGENT_TOOL_IMAGE:-swr.cn-east-3.myhuaweicloud.com/openyuanrong/mini-swe-agent-tool:latest}" +SWE_AGENT_RUN_TIMEOUT="${SWE_AGENT_RUN_TIMEOUT:-7200}" + +# ── AKernel (remote sandbox) ───────────────────────────────────────────── +export AKERNEL_SERVER_ADDRESS="${AKERNEL_SERVER_ADDRESS:-}" +export AKERNEL_TOKEN="${AKERNEL_TOKEN:-}" +export AKERNEL_TUNNEL_SSL_VERIFY="${AKERNEL_TUNNEL_SSL_VERIFY:-0}" + +# ── Logging & env ──────────────────────────────────────────────────────── +export VERL_LOGGING_LEVEL="${VERL_LOGGING_LEVEL:-INFO}" +export ROLLOUT_GPU_MEM_UTIL="${ROLLOUT_GPU_MEM_UTIL:-0.7}" +export AGENT_MAX_TURNS +export SWE_AGENT_EVAL_TIMEOUT="${SWE_AGENT_EVAL_TIMEOUT:-600}" +export PYTHONPATH="${REPO_ROOT}:${REPO_ROOT}/verl:${PYTHONPATH:-}" + +echo "=== Mini-SWE-Agent Blackbox Inference ===" +echo "Model: ${MODEL_PATH}" +echo "Data: ${DATA_PATH}" +echo "Max samples: ${MAX_SAMPLES}" +echo "Engine: ${ENGINE} (TP=${TP})" +echo "Tool image: ${SWE_AGENT_TOOL_IMAGE}" +echo "Batch: n=${N}, gateway=${GATEWAY_COUNT}, max_sessions=${MAX_CONCURRENT_SESSIONS}" +if [[ -n "${GATEWAY_MESSAGE_JSONL_PATH}" ]]; then + echo "Messages: ${GATEWAY_MESSAGE_JSONL_PATH}" +fi +echo "=========================================" + +python examples/blackbox_recipes/mini_swe_agent/parallel_infer.py \ + --model-path "${MODEL_PATH}" \ + --data-path "${DATA_PATH}" \ + --max-samples "${MAX_SAMPLES}" \ + --prompt-length "${PROMPT_LENGTH}" \ + --response-length "${RESPONSE_LENGTH}" \ + --temperature "${TEMPERATURE}" \ + --top-p "${TOP_P}" \ + --n "${N}" \ + --engine "${ENGINE}" \ + --tensor-parallel-size "${TP}" \ + --nnodes "${NNODES}" \ + --n-gpus-per-node "${N_GPUS_PER_NODE}" \ + --gateway-count "${GATEWAY_COUNT}" \ + --max-concurrent-sessions "${MAX_CONCURRENT_SESSIONS}" \ + --tool-image "${SWE_AGENT_TOOL_IMAGE}" \ + --run-timeout "${SWE_AGENT_RUN_TIMEOUT}" \ + --max-turns "${AGENT_MAX_TURNS}" diff --git a/examples/blackbox_recipes/mini_swe_agent/run_train.sh b/examples/blackbox_recipes/mini_swe_agent/run_train.sh new file mode 100755 index 00000000..75a042ba --- /dev/null +++ b/examples/blackbox_recipes/mini_swe_agent/run_train.sh @@ -0,0 +1,300 @@ +#!/usr/bin/env bash +# Megatron + V1 async training for the blackbox mini-swe recipe. +# +# Uses verl.trainer.main_ppo with the V1 unified trainer. The default mode is +# separate_async, which uses separate trainer and rollout GPU pools. +# +# Usage: +# bash examples/blackbox_recipes/mini_swe_agent/run_train.sh +# +# All configurable via environment variables (see defaults below). + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="${REPO_ROOT:-$(cd "${SCRIPT_DIR}/../../.." && pwd)}" +cd "${REPO_ROOT}" + +# ── Model & data ───────────────────────────────────────────────────────── +MODEL_PATH="${MODEL_PATH:-${HOME}/models/Qwen3.5-9B}" +TRAIN_DATA="${TRAIN_DATA:-${HOME}/data/swe_agent/swe_rebench_filtered.parquet}" +VAL_DATA="${VAL_DATA:-${HOME}/data/swe_agent/swe_bench_verified.parquet}" +RUNTIME_ENV="${RUNTIME_ENV:-}" + +# ── V1 trainer ─────────────────────────────────────────────────────────── +TRAINER_MODE="${TRAINER_MODE:-separate_async}" +NUM_WARMUP_BATCHES="${NUM_WARMUP_BATCHES:-1}" +SEPARATE_NUM_WARMUP_BATCHES="${SEPARATE_NUM_WARMUP_BATCHES:-${NUM_WARMUP_BATCHES}}" +PARAMETER_SYNC_STEP="${PARAMETER_SYNC_STEP:-4}" +RAY_SUBMIT_MODE="${RAY_SUBMIT_MODE:-job}" +RAY_INIT_ADDRESS="${RAY_INIT_ADDRESS:-auto}" +RAY_STATUS_TIMEOUT="${RAY_STATUS_TIMEOUT:-5}" +CONFIG_NAME="${CONFIG_NAME:-swe_agent_blackbox_megatron_v1}" + +# ── Hardware ───────────────────────────────────────────────────────────── +NNODES="${NNODES:-${NNODES_TRAIN:-1}}" +PHYSICAL_GPUS_PER_NODE="${PHYSICAL_GPUS_PER_NODE:-8}" +if [[ "${TRAINER_MODE}" == "separate_async" ]]; then + N_GPUS_PER_NODE="${N_GPUS_PER_NODE:-${TRAIN_NGPUS_PER_NODE:-4}}" + ROLLOUT_NNODES="${ROLLOUT_NNODES:-${NNODES_ROLLOUT:-${NNODES}}}" + ROLLOUT_NGPUS_PER_NODE="${ROLLOUT_NGPUS_PER_NODE:-${NGPUS_PER_NODE_ROLLOUT:-4}}" +else + N_GPUS_PER_NODE="${N_GPUS_PER_NODE:-${TRAIN_NGPUS_PER_NODE:-${PHYSICAL_GPUS_PER_NODE}}}" + ROLLOUT_NNODES="${ROLLOUT_NNODES:-${NNODES_ROLLOUT:-0}}" + ROLLOUT_NGPUS_PER_NODE="${ROLLOUT_NGPUS_PER_NODE:-${NGPUS_PER_NODE_ROLLOUT:-${N_GPUS_PER_NODE}}}" +fi + +# ── Algorithm ──────────────────────────────────────────────────────────── +CLIP_RATIO_LOW="${CLIP_RATIO_LOW:-0.2}" +CLIP_RATIO_HIGH="${CLIP_RATIO_HIGH:-0.28}" +ACTOR_LR="${ACTOR_LR:-1e-6}" + +# ── Sequence lengths ───────────────────────────────────────────────────── +PROMPT_LENGTH="${PROMPT_LENGTH:-4096}" +RESPONSE_LENGTH="${RESPONSE_LENGTH:-131072}" +MAX_MODEL_LEN=$((PROMPT_LENGTH + RESPONSE_LENGTH)) + +# ── Rollout parameters ─────────────────────────────────────────────────── +ENGINE="${ENGINE:-vllm}" +if [[ "${TRAINER_MODE}" == "separate_async" ]]; then + GEN_TP="${GEN_TP:-${TP:-${ROLLOUT_NGPUS_PER_NODE}}}" +else + GEN_TP="${GEN_TP:-${TP:-2}}" +fi +N="${N:-8}" +TEMPERATURE="${TEMPERATURE:-1.0}" +TOP_P="${TOP_P:-1.0}" +TOP_K="${TOP_K:--1}" +ROLLOUT_GPU_MEM_UTIL="${ROLLOUT_GPU_MEM_UTIL:-0.7}" +UPDATE_WEIGHTS_BUCKET_MB="${UPDATE_WEIGHTS_BUCKET_MB:-2048}" + +# ── Megatron training parallelism ──────────────────────────────────────── +if [[ "${TRAINER_MODE}" == "separate_async" ]]; then + TRAIN_TP="${TRAIN_TP:-${TP:-${N_GPUS_PER_NODE}}}" +else + TRAIN_TP="${TRAIN_TP:-${TP:-8}}" +fi +TRAIN_PP="${TRAIN_PP:-1}" +TRAIN_CP="${TRAIN_CP:-1}" +OFFLOAD="${OFFLOAD:-True}" +OPTIMIZER_OFFLOAD_FRACTION="${OFFLOAD_FRACTION:-1.0}" +USE_MBRIDGE="${USE_MBRIDGE:-True}" +PPO_MINI_BATCH_SIZE="${PPO_MINI_BATCH_SIZE:-16}" + +# ── Agent parameters ───────────────────────────────────────────────────── +# AGENT_MAX_TURNS is the agent's turn budget inside the sandbox: it becomes the +# mini-swe-agent step_limit (read by the runner via the AGENT_MAX_TURNS env var). +# Note: the trainer's multi_turn.max_assistant_turns is NOT enforced on the +# blackbox rollout path (AgentFrameworkRolloutAdapter), so it is not exposed here. +RUNNER="${RUNNER:-mini_swe}" +AGENT_MAX_TURNS="${AGENT_MAX_TURNS:-100}" +if [[ "${RUNNER}" == "mini_swe" ]]; then + AGENT_RUNNER_FQN="examples.blackbox_recipes.mini_swe_agent.mini_swe_agent_runner.mini_swe_agent_runner" + SWE_AGENT_TOOL_IMAGE="${SWE_AGENT_TOOL_IMAGE:-swr.cn-east-3.myhuaweicloud.com/openyuanrong/mini-swe-agent-tool:latest}" +else + echo "Unknown RUNNER=${RUNNER}; this recipe currently supports mini_swe only" >&2 + exit 1 +fi +SWE_AGENT_RUN_TIMEOUT="${SWE_AGENT_RUN_TIMEOUT:-7200}" +CONDA_ENV="${CONDA_ENV:-testbed}" +GATEWAY_COUNT="${GATEWAY_COUNT:-1}" +MAX_CONCURRENT_SESSIONS="${MAX_CONCURRENT_SESSIONS:-32}" +NUM_AGENT_WORKERS="${NUM_AGENT_WORKERS:-8}" +RUNNER_ARGS=( + "actor_rollout_ref.rollout.agent.agent_loop_manager_class=uni_agent.framework.entry.AgentFrameworkRolloutAdapter" + "actor_rollout_ref.rollout.custom.agent_framework.gateway_count=${GATEWAY_COUNT}" + "actor_rollout_ref.rollout.custom.agent_framework.agent_runners.swe_agent.runner_fqn=${AGENT_RUNNER_FQN}" + "actor_rollout_ref.rollout.custom.agent_framework.agent_runners.swe_agent.dispatch_mode=ray_task" + "actor_rollout_ref.rollout.custom.agent_framework.agent_runners.swe_agent.max_concurrent_sessions=${MAX_CONCURRENT_SESSIONS}" + "actor_rollout_ref.rollout.custom.agent_framework.agent_runners.swe_agent.runner_kwargs.tool_image=${SWE_AGENT_TOOL_IMAGE}" + "actor_rollout_ref.rollout.custom.agent_framework.agent_runners.swe_agent.runner_kwargs.run_timeout=${SWE_AGENT_RUN_TIMEOUT}" + "actor_rollout_ref.rollout.custom.agent_framework.agent_runners.swe_agent.runner_kwargs.conda_env=${CONDA_ENV}" +) + +# ── AKernel (remote sandbox) ───────────────────────────────────────────── +AKERNEL_SERVER_ADDRESS="${AKERNEL_SERVER_ADDRESS:-}" +AKERNEL_TOKEN="${AKERNEL_TOKEN:-}" +AKERNEL_TUNNEL_SSL_VERIFY="${AKERNEL_TUNNEL_SSL_VERIFY:-0}" + +# ── Logging & checkpointing ────────────────────────────────────────────── +PROJECT_NAME="${PROJECT_NAME:-swe_agent_blackbox}" +EXPERIMENT_NAME="${EXPERIMENT_NAME:-swe_agent_$(date +%Y%m%d_%H%M)}" +SAVE_FREQ="${SAVE_FREQ:-10}" +TEST_FREQ="${TEST_FREQ:-10}" +TOTAL_EPOCHS="${TOTAL_EPOCHS:-10}" +TOTAL_TRAINING_STEPS="${TOTAL_TRAINING_STEPS:-}" +VAL_BEFORE_TRAIN="${VAL_BEFORE_TRAIN:-true}" +CKPTS_DIR="${CKPTS_DIR:-checkpoints/${PROJECT_NAME}/${EXPERIMENT_NAME}}" +TRAIN_MAX_SAMPLES="${TRAIN_MAX_SAMPLES:-${MAX_SAMPLES:--1}}" +VAL_MAX_SAMPLES="${VAL_MAX_SAMPLES:-${MAX_SAMPLES:--1}}" +TRAIN_BATCH_SIZE="${TRAIN_BATCH_SIZE:-${PPO_MINI_BATCH_SIZE}}" +VAL_BATCH_SIZE="${VAL_BATCH_SIZE:-${TRAIN_BATCH_SIZE}}" + +export AGENT_MAX_TURNS +export SWE_AGENT_EVAL_TIMEOUT="${SWE_AGENT_EVAL_TIMEOUT:-600}" +export SWE_AGENT_TOOL_IMAGE +export SWE_AGENT_RUN_TIMEOUT +export CONDA_ENV +export GATEWAY_COUNT +export AKERNEL_SERVER_ADDRESS +export AKERNEL_TOKEN +export AKERNEL_TUNNEL_SSL_VERIFY +export PYTHONPATH="${REPO_ROOT}:${REPO_ROOT}/verl:${PYTHONPATH:-}" + +echo "=== SWE-Agent Blackbox Megatron Async Training ===" +echo "Model: ${MODEL_PATH}" +echo "Train data: ${TRAIN_DATA}" +echo "Val data: ${VAL_DATA}" +echo "Engine: ${ENGINE} (gen_tp=${GEN_TP}, train_tp=${TRAIN_TP})" +echo "Runner: ${RUNNER}" +echo "Turns: agent_max_turns=${AGENT_MAX_TURNS}" +echo "Batch: n=${N}, mini_bsz=${PPO_MINI_BATCH_SIZE}" +echo "Sequence: prompt=${PROMPT_LENGTH}, response=${RESPONSE_LENGTH}" +echo "Trainer: V1 ${TRAINER_MODE}" +if [[ "${TRAINER_MODE}" == "separate_async" ]]; then + echo "Resources: trainer=${NNODES}x${N_GPUS_PER_NODE}, rollout=${ROLLOUT_NNODES}x${ROLLOUT_NGPUS_PER_NODE}" +else + echo "Resources: colocated=${NNODES}x${N_GPUS_PER_NODE}" +fi +echo "Samples: train_max=${TRAIN_MAX_SAMPLES}, val_max=${VAL_MAX_SAMPLES}" +echo "===================================================" + +# ── Compute derived parameters ─────────────────────────────────────────── +ACTOR_PPO_MAX_TOKEN_LEN=$(( (PROMPT_LENGTH + RESPONSE_LENGTH) / TRAIN_CP )) +INFER_PPO_MAX_TOKEN_LEN=$(( (PROMPT_LENGTH + RESPONSE_LENGTH) / TRAIN_CP )) + +RUNTIME_ENV_ARGS=() +if [ -n "${RUNTIME_ENV}" ]; then + RUNTIME_ENV_ARGS=(--runtime-env "${RUNTIME_ENV}") +else + RUNTIME_ENV_JSON="$( + python3 - <<'PY' +import json +import os + +env_vars = { + key: value + for key in ( + "PYTHONPATH", + "AKERNEL_SERVER_ADDRESS", + "AKERNEL_TOKEN", + "AKERNEL_TUNNEL_SSL_VERIFY", + "AGENT_MAX_TURNS", + "SWE_AGENT_EVAL_TIMEOUT", + "SWE_AGENT_TOOL_IMAGE", + "SWE_AGENT_RUN_TIMEOUT", + "CONDA_ENV", + "GATEWAY_COUNT", + ) + if (value := os.environ.get(key)) is not None +} +env_vars.setdefault("TRANSFER_QUEUE_ENABLE", "") +env_vars.setdefault("NCCL_P2P_DISABLE", "1") +env_vars.setdefault("NCCL_SHM_DISABLE", "1") +print(json.dumps({"env_vars": env_vars})) +PY + )" + RUNTIME_ENV_ARGS=(--runtime-env-json "${RUNTIME_ENV_JSON}") +fi + +# ── Ensure Ray is running ──────────────────────────────────────────────── +if [[ "${TRAINER_MODE}" == "separate_async" ]]; then + TOTAL_GPUS=$(( NNODES * N_GPUS_PER_NODE + ROLLOUT_NNODES * ROLLOUT_NGPUS_PER_NODE )) +else + TOTAL_GPUS=$(( NNODES * N_GPUS_PER_NODE )) +fi +if ! timeout "${RAY_STATUS_TIMEOUT}" ray status &>/dev/null; then + echo "Starting Ray cluster (${TOTAL_GPUS} GPUs)..." + ray start --head --num-gpus="${TOTAL_GPUS}" --disable-usage-stats +else + echo "Ray cluster already running." +fi + +# ── Launch ──────────────────────────────────────────────────────────────── +WORKING_DIR="${WORKING_DIR:-$(pwd)}" + +MAIN_CMD=( + python3 -m verl.trainer.main_ppo + --config-name="${CONFIG_NAME}" \ + --config-path="${REPO_ROOT}/examples/blackbox_recipes/mini_swe_agent/config" \ + hydra.searchpath=[pkg://verl.trainer.config] \ + +ray_kwargs.ray_init.address="${RAY_INIT_ADDRESS}" \ + trainer.use_v1=True \ + trainer.v1.trainer_mode="${TRAINER_MODE}" \ + trainer.v1.colocate_async.num_warmup_batches=${NUM_WARMUP_BATCHES} \ + trainer.v1.separate_async.num_warmup_batches=${SEPARATE_NUM_WARMUP_BATCHES} \ + trainer.v1.separate_async.parameter_sync_step=${PARAMETER_SYNC_STEP} \ + transfer_queue.enable=True \ + actor_rollout_ref.model.path="${MODEL_PATH}" \ + data.train_files="['${TRAIN_DATA}']" \ + data.val_files="['${VAL_DATA}']" \ + data.train_max_samples=${TRAIN_MAX_SAMPLES} \ + data.val_max_samples=${VAL_MAX_SAMPLES} \ + data.train_batch_size=${TRAIN_BATCH_SIZE} \ + data.val_batch_size=${VAL_BATCH_SIZE} \ + data.max_prompt_length=${PROMPT_LENGTH} \ + data.max_response_length=${RESPONSE_LENGTH} \ + actor_rollout_ref.rollout.n=${N} \ + actor_rollout_ref.rollout.name=${ENGINE} \ + actor_rollout_ref.rollout.prompt_length=${PROMPT_LENGTH} \ + actor_rollout_ref.rollout.response_length=${RESPONSE_LENGTH} \ + actor_rollout_ref.rollout.max_model_len=${MAX_MODEL_LEN} \ + actor_rollout_ref.rollout.max_num_batched_tokens=${MAX_MODEL_LEN} \ + actor_rollout_ref.rollout.temperature=${TEMPERATURE} \ + actor_rollout_ref.rollout.top_p=${TOP_P} \ + actor_rollout_ref.rollout.top_k=${TOP_K} \ + actor_rollout_ref.rollout.checkpoint_engine.update_weights_bucket_megabytes=${UPDATE_WEIGHTS_BUCKET_MB} \ + actor_rollout_ref.rollout.nnodes=${ROLLOUT_NNODES} \ + actor_rollout_ref.rollout.n_gpus_per_node=${ROLLOUT_NGPUS_PER_NODE} \ + actor_rollout_ref.rollout.tensor_model_parallel_size=${GEN_TP} \ + actor_rollout_ref.rollout.gpu_memory_utilization=${ROLLOUT_GPU_MEM_UTIL} \ + actor_rollout_ref.rollout.agent.num_workers=${NUM_AGENT_WORKERS} \ + "${RUNNER_ARGS[@]}" \ + actor_rollout_ref.actor.clip_ratio_low=${CLIP_RATIO_LOW} \ + actor_rollout_ref.actor.clip_ratio_high=${CLIP_RATIO_HIGH} \ + actor_rollout_ref.actor.ppo_mini_batch_size=${PPO_MINI_BATCH_SIZE} \ + actor_rollout_ref.actor.ppo_max_token_len_per_gpu=${ACTOR_PPO_MAX_TOKEN_LEN} \ + actor_rollout_ref.actor.optim.lr=${ACTOR_LR} \ + +actor_rollout_ref.actor.optim.override_optimizer_config.optimizer_offload_fraction=${OPTIMIZER_OFFLOAD_FRACTION} \ + +actor_rollout_ref.actor.optim.override_optimizer_config.overlap_cpu_optimizer_d2h_h2d=True \ + +actor_rollout_ref.actor.optim.override_optimizer_config.use_precision_aware_optimizer=True \ + +actor_rollout_ref.actor.optim.override_optimizer_config.optimizer_cpu_offload=True \ + actor_rollout_ref.actor.megatron.param_offload=${OFFLOAD} \ + actor_rollout_ref.actor.megatron.grad_offload=${OFFLOAD} \ + actor_rollout_ref.actor.megatron.optimizer_offload=${OFFLOAD} \ + actor_rollout_ref.actor.megatron.tensor_model_parallel_size=${TRAIN_TP} \ + actor_rollout_ref.actor.megatron.pipeline_model_parallel_size=${TRAIN_PP} \ + actor_rollout_ref.actor.megatron.context_parallel_size=${TRAIN_CP} \ + actor_rollout_ref.actor.megatron.use_mbridge=${USE_MBRIDGE} \ + actor_rollout_ref.ref.megatron.param_offload=${OFFLOAD} \ + actor_rollout_ref.ref.megatron.tensor_model_parallel_size=${TRAIN_TP} \ + actor_rollout_ref.ref.megatron.pipeline_model_parallel_size=${TRAIN_PP} \ + actor_rollout_ref.ref.megatron.context_parallel_size=${TRAIN_CP} \ + actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=1 \ + actor_rollout_ref.rollout.log_prob_max_token_len_per_gpu=${INFER_PPO_MAX_TOKEN_LEN} \ + actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=1 \ + actor_rollout_ref.ref.log_prob_max_token_len_per_gpu=${INFER_PPO_MAX_TOKEN_LEN} \ + trainer.project_name="${PROJECT_NAME}" \ + trainer.experiment_name="${EXPERIMENT_NAME}" \ + trainer.total_epochs=${TOTAL_EPOCHS} \ + trainer.val_before_train=${VAL_BEFORE_TRAIN} \ + trainer.save_freq=${SAVE_FREQ} \ + trainer.test_freq=${TEST_FREQ} \ + trainer.default_local_dir="${CKPTS_DIR}" \ + trainer.nnodes=${NNODES} \ + trainer.n_gpus_per_node=${N_GPUS_PER_NODE} \ + "$@" +) + +if [[ -n "${TOTAL_TRAINING_STEPS}" ]]; then + MAIN_CMD+=(trainer.total_training_steps=${TOTAL_TRAINING_STEPS}) +fi + +if [[ "${RAY_SUBMIT_MODE}" == "job" ]]; then + ray job submit --no-wait --working-dir="${WORKING_DIR}" "${RUNTIME_ENV_ARGS[@]}" -- "${MAIN_CMD[@]}" +elif [[ "${RAY_SUBMIT_MODE}" == "local" ]]; then + "${MAIN_CMD[@]}" +else + echo "Unknown RAY_SUBMIT_MODE=${RAY_SUBMIT_MODE}; expected job or local" >&2 + exit 1 +fi diff --git a/examples/blackbox_recipes/sandbox_client.py b/examples/blackbox_recipes/sandbox_client.py new file mode 100644 index 00000000..631c8722 --- /dev/null +++ b/examples/blackbox_recipes/sandbox_client.py @@ -0,0 +1,209 @@ +"""AKernel remote sandbox command execution. + +AKernel is an agent sandbox infra collaboratively developed by the +OpenYuanrong team and the Ant AKernel team. +Uses ``akernel_sdk.Sandbox`` with sidecar ``Mount`` to inject the +mini-swe-agent tool image. Supports upstream tunnel so the agent +inside the sandbox can reach the gateway via ``http://127.0.0.1:``. +""" + +from __future__ import annotations + +import asyncio +import logging +import os +import uuid +from dataclasses import dataclass +from typing import Any +from urllib.parse import urlparse + + +@dataclass +class CommandResult: + """Result of a command executed inside a sandbox.""" + + stdout: str + stderr: str + exit_code: int + + +logger = logging.getLogger(__name__) + +DEFAULT_PROXY_PORT = 38197 + + +def _configure_akernel_env() -> None: + """Validate AKernel credentials and map the tunnel SSL flag for akernel_sdk. + + ``akernel_sdk`` reads ``AKERNEL_SERVER_ADDRESS`` / ``AKERNEL_TOKEN`` directly, + so only the tunnel SSL flag needs to be translated to ``TUNNEL_SSL_VERIFY``. + """ + server = os.getenv("AKERNEL_SERVER_ADDRESS") + token = os.getenv("AKERNEL_TOKEN") + if not server or not token: + raise ValueError("AKERNEL_SERVER_ADDRESS and AKERNEL_TOKEN environment variables must be set for sandbox") + os.environ["TUNNEL_SSL_VERIFY"] = os.getenv("AKERNEL_TUNNEL_SSL_VERIFY", "0") + + +def _resolve_sandbox_name() -> str | None: + """Return ``{prefix}{random}`` when ``SANDBOX_NAME_PREFIX`` env is set.""" + prefix = os.getenv("SANDBOX_NAME_PREFIX") + if not prefix: + return None + return f"{prefix}{uuid.uuid4().hex[:8]}" + + +def extract_upstream(gateway_url: str) -> str: + """Extract host:port from a gateway URL for upstream tunnel config. + + Example: "http://8.92.9.155:40169/sessions/abc/v1" -> "8.92.9.155:40169" + """ + parsed = urlparse(gateway_url) + return f"{parsed.hostname}:{parsed.port}" + + +def rewrite_gateway_url( + gateway_url: str, + proxy_port: int = DEFAULT_PROXY_PORT, + *, + strip_v1: bool = False, +) -> str: + """Rewrite gateway URL to use the sandbox-internal tunnel. + + Replaces host:port with 127.0.0.1:, keeps path intact. + + Example: + "http://8.92.9.155:40169/sessions/abc/v1" + -> "http://127.0.0.1:8766/sessions/abc/v1" + """ + parsed = urlparse(gateway_url) + path = parsed.path.removesuffix("/v1") if strip_v1 else parsed.path + return f"http://127.0.0.1:{proxy_port}{path}" + + +class SandboxClient: + """Command execution via remote sandbox.""" + + def __init__(self, sandbox: Any) -> None: + self._sandbox = sandbox + + @property + def sandbox_id(self) -> str: + return getattr(self._sandbox, "sandbox_id", "unknown") + + @classmethod + async def create( + cls, + *, + image: str, + sidecar_image: str, + upstream: str = "", + proxy_port: int = DEFAULT_PROXY_PORT, + env: dict[str, str] | None = None, + cpu: int = 1000, + memory: int = 2048, + cpu_limit: int = 4000, + mem_limit: int = 8192, + idle_timeout: int = 7200, + sidecar_target: str = "/opt/mini-swe-agent", + max_retries: int = 10, + **sandbox_kwargs: Any, + ) -> SandboxClient: + """Create an sandbox client with sidecar tool mounted. + + The sidecar image is mounted at ``sidecar_target`` inside the + sandbox via ``akernel_sdk.Mount``. + + If ``upstream`` is provided, a tunnel is set up so the sandbox can + reach the local gateway via ``http://127.0.0.1:``. + """ + _configure_akernel_env() + from akernel_sdk import Mount, Sandbox + + sb_kwargs: dict[str, Any] = { + "image": image, + "cpu": cpu, + "memory": memory, + "cpu_limit": cpu_limit, + "mem_limit": mem_limit, + "idle_timeout": idle_timeout, + "mounts": [ + Mount(target=sidecar_target, image_url=sidecar_image), + ], + } + if upstream: + sb_kwargs["upstream"] = upstream + sb_kwargs["proxy_port"] = proxy_port + if env: + sb_kwargs["env"] = env + name = _resolve_sandbox_name() + if name is not None: + sb_kwargs["name"] = name + sb_kwargs.update(sandbox_kwargs) + + logger.info( + "Creating sandbox (image=%s, cpu=%d, memory=%d, sidecar=%s:%s, upstream=%s, name=%s)", + image, + cpu, + memory, + sidecar_image, + sidecar_target, + upstream or "none", + name or "auto", + ) + last_error: Exception | None = None + for retry in range(max_retries): + sandbox = None + try: + sandbox = await asyncio.to_thread(lambda: Sandbox(**sb_kwargs)) + logger.info("sandbox created: %s", getattr(sandbox, "sandbox_id", "?")) + return cls(sandbox=sandbox) + except Exception as exc: + last_error = exc + sandbox_id = getattr(sandbox, "sandbox_id", None) + logger.critical( + "Failed to create sandbox (sandbox_id=%s): %s", + sandbox_id or "n/a", + exc, + ) + if sandbox is not None: + try: + await asyncio.to_thread(sandbox.kill) + except Exception: + pass + if retry < max_retries - 1: + sleep_time = min(30, 2**retry) + logger.info("Retrying sandbox creation in %d seconds...", sleep_time) + await asyncio.sleep(sleep_time) + + raise RuntimeError(f"Failed to create sandbox after {max_retries} retries") from last_error + + async def run(self, cmd: str, *, timeout: int = 600) -> CommandResult: + """Execute *cmd* inside the sandbox via ``sandbox.commands.run``.""" + try: + result = await asyncio.to_thread( + self._sandbox.commands.run, + cmd, + timeout=timeout, + ) + return CommandResult( + stdout=getattr(result, "stdout", ""), + stderr=getattr(result, "stderr", ""), + exit_code=getattr(result, "exit_code", -1), + ) + except Exception as e: + return CommandResult(stdout="", stderr=str(e), exit_code=-1) + + async def cleanup(self) -> None: + """Kill the sandbox if still running.""" + if self._sandbox is not None: + sandbox_id = getattr(self._sandbox, "sandbox_id", "?") + try: + if self._sandbox.is_running(): + await asyncio.to_thread(self._sandbox.kill) + logger.info("sandbox %s killed", sandbox_id) + else: + logger.info("sandbox %s already stopped", sandbox_id) + except Exception as e: + logger.warning("Failed to kill sandbox %s: %s", sandbox_id, e) + self._sandbox = None diff --git a/examples/data_preprocess/r2e_gym_subset_filtered.py b/examples/data_preprocess/r2e_gym_subset_filtered.py index c97afbdc..eeafbc0a 100644 --- a/examples/data_preprocess/r2e_gym_subset_filtered.py +++ b/examples/data_preprocess/r2e_gym_subset_filtered.py @@ -17,6 +17,13 @@ def get_image_name(dataset_id: str, instance_id: str) -> str: assert len(parts) == 2 instance_number = parts[1].lower() return PUB_VOLCES_IMG_URL_R2E.format(instance_number=instance_number) +elif impl == "openyuanrong": + + def get_image_name(dataset_id: str, instance_id: str) -> str: + parts = instance_id.split("__") + assert len(parts) == 2 + instance_number = parts[1].lower() + return f"swr.cn-east-3.myhuaweicloud.com/openyuanrong/r2e-gym-subset/{instance_number}:latest" else: raise ValueError(f"Invalid deployment implementation: {impl}") diff --git a/examples/data_preprocess/swe_bench_verified.py b/examples/data_preprocess/swe_bench_verified.py index 8f26ad16..3a56695e 100644 --- a/examples/data_preprocess/swe_bench_verified.py +++ b/examples/data_preprocess/swe_bench_verified.py @@ -18,6 +18,15 @@ def get_image_name(dataset_id: str, instance_id: str) -> str: project_name = parts[0].lower() instance_number = parts[1].lower() return f"swebench/sweb.eval.x86_64.{project_name}_1776_{instance_number}" +elif impl == "openyuanrong": + + def get_image_name(dataset_id: str, instance_id: str) -> str: + assert dataset_id == "swe-bench-verified" + parts = instance_id.split("__") + assert len(parts) == 2 + project_name = parts[0].lower() + instance_number = parts[1].lower() + return f"swr.cn-east-3.myhuaweicloud.com/openyuanrong/swe-bench-verified/sweb.eval.x86_64.{project_name}_1776_{instance_number}:v2" else: raise ValueError(f"Invalid deployment implementation: {impl}") diff --git a/examples/data_preprocess/swe_rebench.py b/examples/data_preprocess/swe_rebench.py index 3add8b28..1cb907df 100644 --- a/examples/data_preprocess/swe_rebench.py +++ b/examples/data_preprocess/swe_rebench.py @@ -17,6 +17,14 @@ def get_image_name(dataset_id, instance_id): project_name = parts[0].lower() instance_number = parts[1].lower() return f"swerebench/sweb.eval.x86_64.{project_name}_1776_{instance_number}" +elif impl == "openyuanrong": + + def get_image_name(dataset_id: str, instance_id: str) -> str: + parts = instance_id.split("__") + assert len(parts) == 2 + project_name = parts[0].lower() + instance_number = parts[1].lower() + return f"swr.cn-east-3.myhuaweicloud.com/openyuanrong/swe-rebench/{project_name}_1776_{instance_number}:latest" else: raise ValueError(f"Invalid deployment implementation: {impl}") diff --git a/uni_agent/gateway/session/codec.py b/uni_agent/gateway/session/codec.py index 747dbde8..b808183d 100644 --- a/uni_agent/gateway/session/codec.py +++ b/uni_agent/gateway/session/codec.py @@ -7,8 +7,8 @@ from uuid import uuid4 from verl.experimental.agent_loop.tool_parser import ToolParser -from verl.utils.chat_template import apply_chat_template as _apply_chat_template -from verl.utils.chat_template import initialize_system_prompt +from verl.utils.tokenizer.chat_template import apply_chat_template as _apply_chat_template +from verl.utils.tokenizer.chat_template import initialize_system_prompt from verl.utils.tokenizer import normalize_token_ids diff --git a/uni_agent/interaction/model.py b/uni_agent/interaction/model.py index bfa0651f..1184688f 100644 --- a/uni_agent/interaction/model.py +++ b/uni_agent/interaction/model.py @@ -138,7 +138,7 @@ async def query( return response_str, [], rollout_cache, generation_info async def _get_new_message_ids(self, new_messages: list[dict[str, Any]]) -> list[int]: - from verl.utils.chat_template import apply_chat_template + from verl.utils.tokenizer.chat_template import apply_chat_template from verl.utils.tokenizer import normalize_token_ids tokenized_prompt = await self.loop.run_in_executor( @@ -154,7 +154,7 @@ async def _get_new_message_ids(self, new_messages: list[dict[str, Any]]) -> list @cached_property def message_boundary_tokens(self) -> list[int]: - from verl.utils.chat_template import apply_chat_template + from verl.utils.tokenizer.chat_template import apply_chat_template from verl.utils.tokenizer import normalize_token_ids dummy_history = [ diff --git a/verl b/verl index 7aed6b23..6fef6a7a 160000 --- a/verl +++ b/verl @@ -1 +1 @@ -Subproject commit 7aed6b230776f963fa09509c10d9c3a767d1102c +Subproject commit 6fef6a7a699435cad84e8907e9121457e41eed04