diff --git a/contrib/models/Qwen3-Coder-480B-A35B-Instruct/README.md b/contrib/models/Qwen3-Coder-480B-A35B-Instruct/README.md new file mode 100644 index 00000000..a85eb401 --- /dev/null +++ b/contrib/models/Qwen3-Coder-480B-A35B-Instruct/README.md @@ -0,0 +1,266 @@ +# Contrib Model: Qwen3-Coder-480B-A35B-Instruct on AWS Trainium2 + +Optimized NxD Inference configuration for serving **Qwen3-Coder-480B-A35B-Instruct** on `trn2.48xlarge` via vLLM. + +## Model Information + +- **HuggingFace ID:** [`Qwen/Qwen3-Coder-480B-A35B-Instruct`](https://huggingface.co/Qwen/Qwen3-Coder-480B-A35B-Instruct) +- **Model Type:** Mixture-of-Experts (MoE) decoder-only transformer +- **Total Parameters:** 480B (35B active per token) +- **License:** Check HuggingFace model card + +## Architecture Details + +Qwen3-Coder-480B shares the `qwen3_moe` architecture with Qwen3-235B-A22B but differs in several dimensions that affect Neuron compilation and optimization: + +| Parameter | Qwen3-Coder-480B | Qwen3-235B-A22B | +|-----------|-------------------|------------------| +| num_key_value_heads | **8** | 4 | +| num_attention_heads | **96** | 64 | +| head_dim | **192** | 128 | +| hidden_size | **6144** | 5120 | +| num_hidden_layers | **62** | 94 | +| num_experts | **160** | 128 | +| num_experts_per_tok | 8 | 8 | +| moe_intermediate_size | 2560 | 2560 | +| max_position_embeddings | 262144 | 131072 | + +These differences -- particularly **8 KV heads** (vs 4) and **head_dim=192** (vs 128) -- required specific optimization work on SDK 2.28 to achieve full NKI kernel compatibility and determine HBM-safe operating points. + +## Hardware Requirements + +- **Instance:** `trn2.48xlarge` (64 NeuronCores, 32 Neuron devices) +- **LNC Mode:** LNC=2 (default, 24 GB HBM per logical core) +- **Disk:** 900+ GB for model weights (241 safetensor shards) +- **RAM:** ~1.5 TB system RAM during weight loading +- **Neuron SDK:** 2.28+ +- **DLAMI:** `Deep Learning AMI Neuron (Ubuntu 24.04) 20260227` + +## Quick Start (vLLM) + +### 1. Download Model + +```bash +pip install huggingface_hub[cli] +huggingface-cli download Qwen/Qwen3-Coder-480B-A35B-Instruct \ + --local-dir /mnt/nvme/Qwen3-Coder-480B-A35B-Instruct/ +``` + +### 2. Launch Server (Recommended Config) + +```bash +# Activate the pre-installed Neuron venv +source /opt/aws_neuronx_venv_pytorch_inference_vllm_0_13/bin/activate + +# Set compilation cache directory +export NEURON_COMPILED_ARTIFACTS=/mnt/nvme/neuron_cache +export NEURON_CC_FLAGS="--retry_failed_compilation" +export VLLM_NEURON_FRAMEWORK='neuronx-distributed-inference' +export MALLOC_ARENA_MAX=64 + +mkdir -p $NEURON_COMPILED_ARTIFACTS + +python -m vllm.entrypoints.openai.api_server \ + --model=/mnt/nvme/Qwen3-Coder-480B-A35B-Instruct/ \ + --tensor-parallel-size=64 \ + --max-num-seqs=16 \ + --max-model-len=8192 \ + --additional-config='{"override_neuron_config": { + "async_mode": true, + "attn_kernel_enabled": true, + "batch_size": 16, + "cc_pipeline_tiling_factor": 2, + "cp_degree": 1, + "ctx_batch_size": 1, + "enable_bucketing": true, + "flash_decoding_enabled": false, + "fused_qkv": true, + "is_continuous_batching": true, + "logical_nc_config": 2, + "max_context_length": 8192, + "mode_mask_padded_tokens": true, + "moe_ep_degree": 1, + "moe_tp_degree": 64, + "qkv_cte_nki_kernel_fuse_rope": true, + "qkv_kernel_enabled": true, + "qkv_nki_kernel_enabled": true, + "scratch_pad_size": 1024, + "seq_len": 8192, + "sequence_parallel_enabled": true, + "torch_dtype": "bfloat16", + "tp_degree": 64 + }}' \ + --no-enable-chunked-prefill \ + --no-enable-prefix-caching \ + --port=8000 +``` + +First launch compiles NEFFs (~22 min for 7 CTE + 7 TKG buckets). Subsequent launches load from cache (~10 min). + +### 3. Test + +```bash +curl -s http://localhost:8000/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "/mnt/nvme/Qwen3-Coder-480B-A35B-Instruct/", + "messages": [{"role": "user", "content": "Write a Python hello world"}], + "max_tokens": 128, + "temperature": 0.6 + }' | python3 -m json.tool +``` + +## Configurations + +Two pre-validated configs are provided in `configs/`: + +### Config A: `throughput_optimized.json` (Recommended) + +- **seq_len=8192, batch_size=16, pure TP=64** +- Auto-bucketing (128, 256, 512, 1024, 2048, 4096, 8192) +- All NKI QKV kernels + fused RoPE enabled +- Best for: Production serving, high concurrent throughput + +### Config B: `long_context.json` + +- **seq_len=16384, batch_size=8, pure TP=64** +- Single bucket (16384) +- Best for: Applications requiring 16K context, at the cost of throughput + +### Configuration Comparison + +| Metric | Config A (8192/BS=16) | Config B (16384/BS=8) | +|--------|----------------------|----------------------| +| TTFT (short prompt) | **0.85s** | 18.47s | +| Decode throughput | **15.3 tok/s** | ~8.5 tok/s | +| Max concurrent requests | **16** | 8 | +| Aggregate TPS @ 1 concurrent | **14.73** | 8.37 | +| Aggregate TPS @ 4 concurrent | **43.42** | 9.74 | +| Aggregate TPS @ 8 concurrent | **73.23** | 10.41 | +| Max context length | 8192 | **16384** | +| Compile time | ~22 min | ~15 min | +| Weight load time | ~10 min | ~10 min | +| Generation quality | Correct | Correct | + +## Benchmark Results + +Validated on `trn2.48xlarge`, SDK 2.28, `Deep Learning AMI Neuron (Ubuntu 24.04) 20260227`. + +### Config A: Throughput Optimized (Recommended) + +| Concurrency | Aggregate TPS | Avg per-request TPS | Avg Latency | +|-------------|--------------|---------------------|-------------| +| 1 | 14.73 | 14.73 | 17.4s | +| 2 | 28.14 | 14.07 | 18.2s | +| 4 | 43.42 | 10.86 | 23.6s | +| 8 | 73.23 | 9.15 | 28.0s | + +- **TTFT (short prompt):** 0.85s (auto-bucketing selects optimal bucket) +- **TTFT (8K prompt):** ~7.1s +- **Peak single-request decode:** 15.3 tok/s + +### Config B: Long Context (16384) + +| Concurrency | Aggregate TPS | Avg per-request TPS | Avg Latency | +|-------------|--------------|---------------------|-------------| +| 1 | 8.37 | 8.37 | 30.6s | +| 4 | 9.74 | 2.44 | 105.1s | +| 8 | 10.41 | 1.30 | 196.3s | + +## Known Issues and Limitations + +### 1. `flash_decoding_enabled` Must Be `false` + +Enabling `flash_decoding_enabled=true` causes: +``` +AssertionError: kv_shared parallel group is not initialized +``` +during `attention_tokengen`. The 8 KV heads / 96 Q heads ratio in pure TP=64 config is incompatible with the flash decoding path. + +### 2. NKI QKV Kernel Incompatible with `attention_dp_degree > 1` + +`qkv_nki_kernel_enabled=true` and `qkv_cte_nki_kernel_fuse_rope=true` fail with `attention_dp_degree > 1`: +``` +AssertionError: Output bfloat16 %out_tensor(1, 1024, 3584) has no store def +``` +Workaround: Use only `qkv_kernel_enabled=true` (legacy kernel) when `attention_dp_degree > 1`. + +### 3. Context Parallelism (`cp_degree=16`) Causes ICE with `head_dim=192` + +The Coder's `head_dim=192` creates attention tensors (64x192=12288) that exceed SBUF partition limits during CTE linking: +``` +Allocated memory out of bound {I-xxx}@SB<0,229376>(128x12288) +``` +`cp_degree=8` compiles successfully, but produces garbage output for `seq_len > 8192`. Only `cp_degree=1` (pure TP) produces correct results at all sequence lengths. + +### 4. Expert Parallelism Requires `batch_size >= 20` + +With `moe_ep_degree=32`, selective loading triggers when `batch_size * top_k / num_experts < 1.0`. For the Coder (160 experts, top_k=8): `batch_size >= 20` is required to bypass this. + +### 5. Maximum Context Length + +| seq_len | Max batch_size | Status | +|---------|---------------|--------| +| 8192 | 16 | Working (recommended) | +| 12288 | - | NEFF load OOM | +| 16384 | 8 | Working (reduced throughput) | + +HBM is the limiting factor: 24 GB per core at LNC=2, with `hidden_size=6144` producing larger I/O tensors than the 235B model. + +### 6. `on_device_sampling_config` Adds Per-Token Overhead + +Removing `on_device_sampling_config` from the NeuronConfig (letting vLLM handle sampling on CPU) improves decode throughput by ~11% (13.8 -> 15.3 tok/s) and aggregate throughput by 3.3x when combined with auto-bucketing. + +### 7. Auto-Bucketing is Critical for TTFT + +Without explicit bucket lists, NxDI auto-generates buckets (128, 256, 512, 1024, 2048, 4096, 8192). This reduces TTFT from 7.14s to 0.85s for short prompts (8.4x improvement), since short prompts no longer pad to the maximum sequence length. + +## Compilation Strategy + +First compilation takes ~22 minutes (Config A) or ~15 minutes (Config B). Use `NEURON_COMPILED_ARTIFACTS` to cache NEFFs: + +```bash +export NEURON_COMPILED_ARTIFACTS=/mnt/nvme/neuron_cache +mkdir -p $NEURON_COMPILED_ARTIFACTS +``` + +Subsequent server startups skip compilation and load from cache (~10 min for weight loading). + +If compilation fails with OOM, reduce `batch_size` first. The model's `hidden_size=6144` uses more HBM than the 235B model per sequence. + +## NxDI Direct Usage + +See `src/generation_qwen3_coder_demo.py` for a standalone NxDI example (without vLLM). + +## Compatibility Matrix + +| Instance/SDK | 2.28+ | 2.27 and earlier | +|--------------|-------|------------------| +| trn2.48xlarge | Working (recommended) | Limited (no NKI QKV kernels) | +| trn2.3xlarge | Not enough NeuronCores (needs TP=64) | N/A | +| trn1 / Inf2 | Not tested | Not tested | + +## Testing + +Run the benchmark script against a running vLLM server: + +```bash +# Start vLLM server (see Quick Start above), then in another terminal: +python3 contrib/models/Qwen3-Coder-480B-A35B-Instruct/test/integration/test_model.py +``` + +Or with pytest: + +```bash +pytest contrib/models/Qwen3-Coder-480B-A35B-Instruct/test/integration/test_model.py -v --capture=tee-sys +``` + +## Example Checkpoints + +- [Qwen/Qwen3-Coder-480B-A35B-Instruct](https://huggingface.co/Qwen/Qwen3-Coder-480B-A35B-Instruct) + +## Maintainer + +Jim Burtoft - Annapurna Labs + +**Last Updated:** 2026-03-10 diff --git a/contrib/models/Qwen3-Coder-480B-A35B-Instruct/configs/long_context.json b/contrib/models/Qwen3-Coder-480B-A35B-Instruct/configs/long_context.json new file mode 100644 index 00000000..6dc9705d --- /dev/null +++ b/contrib/models/Qwen3-Coder-480B-A35B-Instruct/configs/long_context.json @@ -0,0 +1,53 @@ +{ + "description": "Qwen3-Coder-480B-A35B-Instruct: Long-context config (16K) for trn2.48xlarge", + "hardware": "trn2.48xlarge (64 NeuronCores, LNC=2)", + "sdk": "2.28+", + "vllm_flags": { + "tensor_parallel_size": 64, + "max_num_seqs": 8, + "max_model_len": 16384, + "no_enable_chunked_prefill": true, + "no_enable_prefix_caching": true + }, + "override_neuron_config": { + "async_mode": true, + "attn_kernel_enabled": true, + "batch_size": 8, + "cc_pipeline_tiling_factor": 2, + "context_encoding_buckets": [16384], + "cp_degree": 1, + "ctx_batch_size": 1, + "enable_bucketing": true, + "flash_decoding_enabled": false, + "fused_qkv": true, + "is_continuous_batching": true, + "logical_nc_config": 2, + "max_context_length": 16384, + "mode_mask_padded_tokens": true, + "moe_ep_degree": 1, + "moe_tp_degree": 64, + "on_device_sampling_config": {"do_sample": true, "temperature": 0.6, "top_k": 20, "top_p": 0.95}, + "qkv_cte_nki_kernel_fuse_rope": true, + "qkv_kernel_enabled": true, + "qkv_nki_kernel_enabled": true, + "scratch_pad_size": 1024, + "seq_len": 16384, + "sequence_parallel_enabled": true, + "token_generation_buckets": [16384], + "torch_dtype": "bfloat16", + "tp_degree": 64 + }, + "notes": { + "batch_size_limit": "Max batch_size=8 at 16384 (BS=12 OOM at NEFF load, BS=16 OOM at compile)", + "flash_decoding": "Must be false; kv_shared group assertion fails with 8 KV heads in pure TP=64", + "compile_time": "~15 min first compile, then cached", + "weight_load_time": "~10 min", + "tradeoff": "2x lower throughput vs 8192 config, but supports 16K context" + }, + "benchmark": { + "ttft": "18.47s", + "aggregate_tps_1_concurrent": "8.37 tok/s", + "aggregate_tps_4_concurrent": "9.74 tok/s", + "aggregate_tps_8_concurrent": "10.41 tok/s" + } +} diff --git a/contrib/models/Qwen3-Coder-480B-A35B-Instruct/configs/throughput_optimized.json b/contrib/models/Qwen3-Coder-480B-A35B-Instruct/configs/throughput_optimized.json new file mode 100644 index 00000000..177ca890 --- /dev/null +++ b/contrib/models/Qwen3-Coder-480B-A35B-Instruct/configs/throughput_optimized.json @@ -0,0 +1,51 @@ +{ + "description": "Qwen3-Coder-480B-A35B-Instruct: Throughput-optimized config for trn2.48xlarge", + "hardware": "trn2.48xlarge (64 NeuronCores, LNC=2)", + "sdk": "2.28+", + "vllm_flags": { + "tensor_parallel_size": 64, + "max_num_seqs": 16, + "max_model_len": 8192, + "no_enable_chunked_prefill": true, + "no_enable_prefix_caching": true + }, + "override_neuron_config": { + "async_mode": true, + "attn_kernel_enabled": true, + "batch_size": 16, + "cc_pipeline_tiling_factor": 2, + "cp_degree": 1, + "ctx_batch_size": 1, + "enable_bucketing": true, + "flash_decoding_enabled": false, + "fused_qkv": true, + "is_continuous_batching": true, + "logical_nc_config": 2, + "max_context_length": 8192, + "mode_mask_padded_tokens": true, + "moe_ep_degree": 1, + "moe_tp_degree": 64, + "qkv_cte_nki_kernel_fuse_rope": true, + "qkv_kernel_enabled": true, + "qkv_nki_kernel_enabled": true, + "scratch_pad_size": 1024, + "seq_len": 8192, + "sequence_parallel_enabled": true, + "torch_dtype": "bfloat16", + "tp_degree": 64 + }, + "notes": { + "auto_bucketing": "No explicit bucket lists; enable_bucketing=true auto-generates: 128, 256, 512, 1024, 2048, 4096, 8192", + "no_on_device_sampling": "on_device_sampling_config is omitted; vLLM handles sampling on CPU for better throughput", + "flash_decoding": "Must be false; kv_shared group assertion fails with 8 KV heads in pure TP=64", + "compile_time": "~22 min first compile (7 CTE + 7 TKG buckets), then cached", + "weight_load_time": "~10 min" + }, + "benchmark": { + "ttft_short_prompt": "0.85s", + "decode_throughput": "15.3 tok/s (single request)", + "aggregate_tps_1_concurrent": "14.73 tok/s", + "aggregate_tps_4_concurrent": "43.42 tok/s", + "aggregate_tps_8_concurrent": "73.23 tok/s" + } +} diff --git a/contrib/models/Qwen3-Coder-480B-A35B-Instruct/src/__init__.py b/contrib/models/Qwen3-Coder-480B-A35B-Instruct/src/__init__.py new file mode 100644 index 00000000..717469db --- /dev/null +++ b/contrib/models/Qwen3-Coder-480B-A35B-Instruct/src/__init__.py @@ -0,0 +1,3 @@ +# Qwen3-Coder-480B-A35B-Instruct contrib model +# This model uses the existing qwen3_moe architecture in NxD Inference. +# No custom modeling code is needed; this contrib provides optimized configs and scripts. diff --git a/contrib/models/Qwen3-Coder-480B-A35B-Instruct/src/bench_qwen3_coder.py b/contrib/models/Qwen3-Coder-480B-A35B-Instruct/src/bench_qwen3_coder.py new file mode 100644 index 00000000..e61a4f2e --- /dev/null +++ b/contrib/models/Qwen3-Coder-480B-A35B-Instruct/src/bench_qwen3_coder.py @@ -0,0 +1,318 @@ +#!/usr/bin/env python3 +""" +Benchmark script for Qwen3-Coder-480B-A35B-Instruct on vLLM/Neuron. + +Measures TTFT, decode throughput, concurrent throughput, and generation quality. +Requires a running vLLM server (see qwen3_coder_vllm.sh). + +Usage: + python bench_qwen3_coder.py # Full benchmark + python bench_qwen3_coder.py --quick # Quick smoke test + python bench_qwen3_coder.py --max-concurrency 4 # Limit concurrency +""" + +import argparse +import json +import sys +import threading +import time +import urllib.request + +import requests + +DEFAULT_API_URL = "http://localhost:8000" +DEFAULT_MODEL = "/mnt/nvme/Qwen3-Coder-480B-A35B-Instruct/" + + +def completions_request(api_url, model, prompt, max_tokens, temperature=0.6): + """Send a non-streaming completions request.""" + payload = { + "model": model, + "prompt": prompt, + "max_tokens": max_tokens, + "temperature": temperature, + "top_p": 0.95, + } + start = time.time() + resp = requests.post(f"{api_url}/v1/completions", json=payload, timeout=600) + elapsed = time.time() - start + + if resp.status_code != 200: + return {"error": f"HTTP {resp.status_code}: {resp.text[:200]}"} + + data = resp.json() + usage = data.get("usage", {}) + return { + "elapsed_s": elapsed, + "prompt_tokens": usage.get("prompt_tokens", 0), + "completion_tokens": usage.get("completion_tokens", 0), + "tokens_per_sec": usage.get("completion_tokens", 0) / elapsed + if elapsed > 0 + else 0, + "text": data["choices"][0]["text"] if data.get("choices") else "", + } + + +def streaming_request(api_url, model, prompt, max_tokens, temperature=0.6): + """Send a streaming request to measure TTFT.""" + payload = { + "model": model, + "prompt": prompt, + "max_tokens": max_tokens, + "temperature": temperature, + "top_p": 0.95, + "stream": True, + } + start = time.time() + ttft = None + token_count = 0 + try: + resp = requests.post( + f"{api_url}/v1/completions", json=payload, timeout=600, stream=True + ) + for line in resp.iter_lines(): + if line: + decoded = line.decode("utf-8") + if decoded.startswith("data: ") and decoded != "data: [DONE]": + if ttft is None: + ttft = time.time() - start + try: + chunk = json.loads(decoded[6:]) + if chunk.get("choices", [{}])[0].get("text"): + token_count += 1 + except json.JSONDecodeError: + pass + except Exception as e: + return {"error": str(e)} + + elapsed = time.time() - start + return { + "elapsed_s": elapsed, + "ttft_s": ttft, + "token_count": token_count, + "tokens_per_sec": token_count / elapsed if elapsed > 0 else 0, + "decode_tps": token_count / (elapsed - ttft) + if (ttft and elapsed > ttft) + else 0, + } + + +def chat_request(api_url, model, prompt, max_tokens, temperature=0.6): + """Send a chat completions request (for concurrent benchmark).""" + data = json.dumps( + { + "model": model, + "messages": [{"role": "user", "content": prompt}], + "max_tokens": max_tokens, + "temperature": temperature, + "top_p": 0.95, + "top_k": 20, + } + ).encode() + + start = time.time() + try: + req = urllib.request.Request( + f"{api_url}/v1/chat/completions", + data=data, + headers={"Content-Type": "application/json"}, + ) + with urllib.request.urlopen(req, timeout=600) as resp: + result = json.loads(resp.read().decode()) + elapsed = time.time() - start + usage = result["usage"] + content = result["choices"][0]["message"]["content"] + return { + "elapsed": elapsed, + "completion_tokens": usage["completion_tokens"], + "prompt_tokens": usage["prompt_tokens"], + "tok_s": usage["completion_tokens"] / elapsed if elapsed > 0 else 0, + "ok": len(content.strip()) > 10, + } + except Exception as e: + return {"elapsed": time.time() - start, "error": str(e)} + + +CONCURRENT_PROMPTS = [ + "Write a Python function to compute the first N Fibonacci numbers.", + "Explain what tensor parallelism is in 3 sentences.", + "Write a hello world program in Rust.", + "What is 127 * 389? Show your work.", + "Translate to French: 'The quick brown fox jumps over the lazy dog.'", + "Write a Python function for binary search on a sorted list.", + "Explain the difference between TCP and UDP in simple terms.", + "Write a Python class that implements a stack with push, pop, and peek.", +] + + +def benchmark_concurrency(api_url, model, n_concurrent, max_tokens=256): + """Send n_concurrent requests simultaneously.""" + results = [None] * n_concurrent + threads = [] + + wall_start = time.time() + for i in range(n_concurrent): + prompt = CONCURRENT_PROMPTS[i % len(CONCURRENT_PROMPTS)] + t = threading.Thread( + target=lambda idx, p: results.__setitem__( + idx, chat_request(api_url, model, p, max_tokens) + ), + args=(i, prompt), + ) + threads.append(t) + t.start() + + for t in threads: + t.join(timeout=600) + wall_elapsed = time.time() - wall_start + + valid = [r for r in results if r and "error" not in r] + errors = [r for r in results if r and "error" in r] + + total_tokens = sum(r["completion_tokens"] for r in valid) + aggregate_tps = total_tokens / wall_elapsed if wall_elapsed > 0 else 0 + avg_per_req = sum(r["tok_s"] for r in valid) / len(valid) if valid else 0 + avg_latency = sum(r["elapsed"] for r in valid) / len(valid) if valid else 0 + all_ok = all(r.get("ok", False) for r in valid) + + return { + "concurrency": n_concurrent, + "wall_time": wall_elapsed, + "total_tokens": total_tokens, + "aggregate_tps": aggregate_tps, + "avg_per_req_tps": avg_per_req, + "avg_latency": avg_latency, + "success": len(valid), + "errors": len(errors), + "quality_ok": all_ok, + } + + +def run_benchmark(api_url, model, quick=False, max_concurrency=8): + print("=" * 70) + print("Qwen3-Coder-480B-A35B-Instruct Benchmark") + print(f"Server: {api_url}") + print("=" * 70) + + # Warmup + print("\n--- Warmup ---") + w = completions_request(api_url, model, "Hello", 16, temperature=0.1) + if "error" in w: + print(f"FAILED: {w['error']}") + sys.exit(1) + print(f"Warmup: {w['elapsed_s']:.2f}s, {w['completion_tokens']} tokens") + + # TTFT (streaming) + print("\n--- TTFT Measurement (streaming) ---") + ttft_tests = [ + ("Hello world", 32), + ("Write a Python function to sort a list", 64), + ] + if not quick: + ttft_tests.append( + ( + "Explain the architecture of a transformer model in detail, covering " + "attention mechanisms, feed-forward layers, and positional encoding", + 64, + ) + ) + + for prompt, max_tok in ttft_tests: + r = streaming_request(api_url, model, prompt, max_tok, temperature=0.1) + if "error" not in r: + print( + f" ~{len(prompt.split())} words | TTFT: {r['ttft_s']:.3f}s | " + f"Decode: {r['decode_tps']:.1f} tok/s | Tokens: {r['token_count']}" + ) + else: + print(f" ERROR: {r['error']}") + + # Single-request throughput + print("\n--- Single-Request Throughput ---") + test_cases = [ + ("Write a Python hello world program", 128), + ("Write a binary search function in Python with type hints", 256), + ] + if not quick: + test_cases.append( + ( + "Write a Python function that implements quicksort. Include docstring and type hints.", + 512, + ) + ) + + header = ( + f"{'PromptTok':<11} {'MaxTok':<8} {'OutTok':<8} {'Time(s)':<10} {'Tok/s':<8}" + ) + print(header) + print("-" * len(header)) + + for prompt, max_tok in test_cases: + r = completions_request(api_url, model, prompt, max_tok, temperature=0.1) + if "error" not in r: + print( + f"{r['prompt_tokens']:<11} {max_tok:<8} {r['completion_tokens']:<8} " + f"{r['elapsed_s']:<10.2f} {r['tokens_per_sec']:<8.2f}" + ) + + # Concurrent throughput + print("\n--- Concurrent Throughput ---") + header = f"{'Conc':<6} {'Wall(s)':<10} {'Tokens':<10} {'AggTPS':<10} {'AvgReqTPS':<12} {'Quality'}" + print(header) + print("-" * len(header)) + + for c in [1, 2, 4, 8]: + if c > max_concurrency: + break + r = benchmark_concurrency(api_url, model, c) + print( + f"{r['concurrency']:<6} {r['wall_time']:<10.2f} {r['total_tokens']:<10} " + f"{r['aggregate_tps']:<10.2f} {r['avg_per_req_tps']:<12.2f} " + f"{'PASS' if r['quality_ok'] else 'FAIL'}" + ) + + # Quality check + print("\n--- Generation Quality ---") + quality_prompts = [ + ( + "Write a Python function to compute the nth Fibonacci number using memoization:\n```python\n", + 256, + ), + ("What is 2 + 2? Answer with just the number:", 8), + ] + if not quick: + quality_prompts.append(("Translate to French: 'The cat sat on the mat.'", 32)) + + for prompt, max_tok in quality_prompts: + r = completions_request(api_url, model, prompt, max_tok, temperature=0.0) + if "error" not in r: + text = r["text"][:200].replace("\n", " ") + print(f" Prompt: {prompt[:50]}...") + print(f" Output ({r['completion_tokens']} tok): {text}") + else: + print(f" ERROR: {r['error']}") + + print("\n" + "=" * 70) + print("Benchmark complete") + print("=" * 70) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Benchmark Qwen3-Coder-480B on vLLM/Neuron" + ) + parser.add_argument("--api-url", default=DEFAULT_API_URL, help="vLLM server URL") + parser.add_argument( + "--model", default=DEFAULT_MODEL, help="Model path (as registered in vLLM)" + ) + parser.add_argument( + "--quick", action="store_true", help="Quick smoke test (fewer prompts)" + ) + parser.add_argument( + "--max-concurrency", type=int, default=8, help="Max concurrency level" + ) + args = parser.parse_args() + + run_benchmark( + args.api_url, args.model, quick=args.quick, max_concurrency=args.max_concurrency + ) diff --git a/contrib/models/Qwen3-Coder-480B-A35B-Instruct/src/generation_qwen3_coder_demo.py b/contrib/models/Qwen3-Coder-480B-A35B-Instruct/src/generation_qwen3_coder_demo.py new file mode 100644 index 00000000..0c960b67 --- /dev/null +++ b/contrib/models/Qwen3-Coder-480B-A35B-Instruct/src/generation_qwen3_coder_demo.py @@ -0,0 +1,116 @@ +""" +NxD Inference direct usage example for Qwen3-Coder-480B-A35B-Instruct. + +This demonstrates using NxDI directly (without vLLM) for compile + inference. +For production serving, use the vLLM launch script instead. + +Requirements: + - trn2.48xlarge instance + - SDK 2.28+ + - Model weights at MODEL_PATH + +Usage: + # First run (compiles + infers): + python generation_qwen3_coder_demo.py + + # Subsequent runs (loads from cache): + python generation_qwen3_coder_demo.py --skip-compile +""" + +import argparse +import torch +from transformers import AutoTokenizer, GenerationConfig + +from neuronx_distributed_inference.models.config import MoENeuronConfig +from neuronx_distributed_inference.models.qwen3_moe.modeling_qwen3_moe import ( + Qwen3MoeInferenceConfig, + NeuronQwen3MoeForCausalLM, +) +from neuronx_distributed_inference.utils.hf_adapter import ( + HuggingFaceGenerationAdapter, + load_pretrained_config, +) + +MODEL_PATH = "/mnt/nvme/Qwen3-Coder-480B-A35B-Instruct/" +COMPILED_MODEL_PATH = "/mnt/nvme/Qwen3-Coder-480B-A35B-Instruct/traced_model/" + +DTYPE = torch.bfloat16 + + +def generate(skip_compile=False): + generation_config = GenerationConfig.from_pretrained(MODEL_PATH) + + if not skip_compile: + neuron_config = MoENeuronConfig( + tp_degree=64, + moe_tp_degree=64, + moe_ep_degree=1, + batch_size=16, + ctx_batch_size=1, + seq_len=8192, + scratchpad_page_size=1024, + torch_dtype=DTYPE, + enable_bucketing=True, + flash_decoding_enabled=False, + cp_degree=1, + fused_qkv=True, + is_continuous_batching=True, + logical_nc_config=2, + sequence_parallel_enabled=True, + qkv_kernel_enabled=True, + qkv_nki_kernel_enabled=True, + qkv_cte_nki_kernel_fuse_rope=True, + attn_kernel_enabled=True, + async_mode=True, + cc_pipeline_tiling_factor=2, + mode_mask_padded_tokens=True, + ) + config = Qwen3MoeInferenceConfig( + neuron_config, + load_config=load_pretrained_config(MODEL_PATH), + ) + tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, padding_side="right") + tokenizer.pad_token = tokenizer.eos_token + + print("\nCompiling model (first run takes ~22 min)...") + model = NeuronQwen3MoeForCausalLM(MODEL_PATH, config) + model.compile(COMPILED_MODEL_PATH) + tokenizer.save_pretrained(COMPILED_MODEL_PATH) + + print("\nLoading model from compiled checkpoint (~10 min)...") + model = NeuronQwen3MoeForCausalLM(COMPILED_MODEL_PATH) + model.load(COMPILED_MODEL_PATH) + tokenizer = AutoTokenizer.from_pretrained(COMPILED_MODEL_PATH) + + print("\nGenerating...") + prompt = "Write a Python function that implements quicksort with type hints." + messages = [{"role": "user", "content": prompt}] + text = tokenizer.apply_chat_template( + messages, + tokenize=False, + add_generation_prompt=True, + enable_thinking=True, + ) + inputs = tokenizer([text], padding=True, return_tensors="pt") + generation_model = HuggingFaceGenerationAdapter(model) + outputs = generation_model.generate( + inputs.input_ids, + generation_config=generation_config, + attention_mask=inputs.attention_mask, + max_length=model.config.neuron_config.max_length, + ) + output_tokens = tokenizer.batch_decode( + outputs, skip_special_tokens=True, clean_up_tokenization_spaces=False + ) + print("Generated output:") + for i, output_token in enumerate(output_tokens): + print(f"Output {i}: {output_token}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--skip-compile", action="store_true", help="Skip compilation, load from cache" + ) + args = parser.parse_args() + generate(skip_compile=args.skip_compile) diff --git a/contrib/models/Qwen3-Coder-480B-A35B-Instruct/src/qwen3_coder_vllm.sh b/contrib/models/Qwen3-Coder-480B-A35B-Instruct/src/qwen3_coder_vllm.sh new file mode 100755 index 00000000..b07fad41 --- /dev/null +++ b/contrib/models/Qwen3-Coder-480B-A35B-Instruct/src/qwen3_coder_vllm.sh @@ -0,0 +1,59 @@ +#!/bin/bash +# Qwen3-Coder-480B-A35B-Instruct: vLLM serving on trn2.48xlarge +# +# Throughput-optimized configuration: +# - seq_len=8192, batch_size=16, pure TP=64 +# - Auto-bucketing (128, 256, 512, 1024, 2048, 4096, 8192) +# - All NKI QKV kernels + fused RoPE enabled +# - No on_device_sampling (vLLM samples on CPU for better throughput) +# +# First launch: ~22 min compile + ~10 min weight load +# Subsequent launches: ~10 min (load from cache) +# +# Usage: +# ./qwen3_coder_vllm.sh # Use default model path +# MODEL_PATH=/path/to/model ./qwen3_coder_vllm.sh # Custom model path +# CONFIG=long_context ./qwen3_coder_vllm.sh # Use 16K context config + +set -euo pipefail + +MODEL_PATH="${MODEL_PATH:-/mnt/nvme/Qwen3-Coder-480B-A35B-Instruct/}" +CACHE_DIR="${CACHE_DIR:-/mnt/nvme/neuron_cache}" +PORT="${PORT:-8000}" +CONFIG="${CONFIG:-throughput}" + +source /opt/aws_neuronx_venv_pytorch_inference_vllm_0_13/bin/activate + +export NEURON_COMPILED_ARTIFACTS="$CACHE_DIR" +export NEURON_CC_FLAGS="--retry_failed_compilation" +export VLLM_NEURON_FRAMEWORK='neuronx-distributed-inference' +export MALLOC_ARENA_MAX=64 + +mkdir -p "$CACHE_DIR" + +if [ "$CONFIG" = "long_context" ]; then + echo "=== Qwen3-Coder-480B: Long Context Config (16384, BS=8) ===" + ADDITIONAL_CONFIG='{"override_neuron_config": {"async_mode": true, "attn_kernel_enabled": true, "batch_size": 8, "cc_pipeline_tiling_factor": 2, "context_encoding_buckets": [16384], "cp_degree": 1, "ctx_batch_size": 1, "enable_bucketing": true, "flash_decoding_enabled": false, "fused_qkv": true, "is_continuous_batching": true, "logical_nc_config": 2, "max_context_length": 16384, "mode_mask_padded_tokens": true, "moe_ep_degree": 1, "moe_tp_degree": 64, "on_device_sampling_config": {"do_sample": true, "temperature": 0.6, "top_k": 20, "top_p": 0.95}, "qkv_cte_nki_kernel_fuse_rope": true, "qkv_kernel_enabled": true, "qkv_nki_kernel_enabled": true, "scratch_pad_size": 1024, "seq_len": 16384, "sequence_parallel_enabled": true, "token_generation_buckets": [16384], "torch_dtype": "bfloat16", "tp_degree": 64}}' + MAX_NUM_SEQS=8 + MAX_MODEL_LEN=16384 +else + echo "=== Qwen3-Coder-480B: Throughput Optimized Config (8192, BS=16) ===" + ADDITIONAL_CONFIG='{"override_neuron_config": {"async_mode": true, "attn_kernel_enabled": true, "batch_size": 16, "cc_pipeline_tiling_factor": 2, "cp_degree": 1, "ctx_batch_size": 1, "enable_bucketing": true, "flash_decoding_enabled": false, "fused_qkv": true, "is_continuous_batching": true, "logical_nc_config": 2, "max_context_length": 8192, "mode_mask_padded_tokens": true, "moe_ep_degree": 1, "moe_tp_degree": 64, "qkv_cte_nki_kernel_fuse_rope": true, "qkv_kernel_enabled": true, "qkv_nki_kernel_enabled": true, "scratch_pad_size": 1024, "seq_len": 8192, "sequence_parallel_enabled": true, "torch_dtype": "bfloat16", "tp_degree": 64}}' + MAX_NUM_SEQS=16 + MAX_MODEL_LEN=8192 +fi + +echo "Model: $MODEL_PATH" +echo "Cache: $CACHE_DIR" +echo "Port: $PORT" +echo "Time: $(date -u)" + +python -u -m vllm.entrypoints.openai.api_server \ + --model="$MODEL_PATH" \ + --tensor-parallel-size=64 \ + --max-num-seqs="$MAX_NUM_SEQS" \ + --max-model-len="$MAX_MODEL_LEN" \ + --additional-config="$ADDITIONAL_CONFIG" \ + --no-enable-chunked-prefill \ + --no-enable-prefix-caching \ + --port="$PORT" diff --git a/contrib/models/Qwen3-Coder-480B-A35B-Instruct/test/__init__.py b/contrib/models/Qwen3-Coder-480B-A35B-Instruct/test/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/contrib/models/Qwen3-Coder-480B-A35B-Instruct/test/integration/__init__.py b/contrib/models/Qwen3-Coder-480B-A35B-Instruct/test/integration/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/contrib/models/Qwen3-Coder-480B-A35B-Instruct/test/integration/test_model.py b/contrib/models/Qwen3-Coder-480B-A35B-Instruct/test/integration/test_model.py new file mode 100644 index 00000000..424ad7ce --- /dev/null +++ b/contrib/models/Qwen3-Coder-480B-A35B-Instruct/test/integration/test_model.py @@ -0,0 +1,337 @@ +#!/usr/bin/env python3 +""" +Integration tests for Qwen3-Coder-480B-A35B-Instruct on vLLM/Neuron. + +Tests generation quality, TTFT, throughput, and concurrent serving against +a running vLLM server. This model uses the existing qwen3_moe architecture +in NxD Inference, so we test via the vLLM OpenAI-compatible API. + +Prerequisites: + - vLLM server running with Qwen3-Coder-480B (see src/qwen3_coder_vllm.sh) + +Usage: + # With pytest: + pytest test/integration/test_model.py -v --capture=tee-sys + + # Standalone: + python test/integration/test_model.py +""" + +import json +import os +import sys +import threading +import time + +import pytest +import requests + +API_URL = os.environ.get("VLLM_API_URL", "http://localhost:8000") +MODEL = os.environ.get("VLLM_MODEL", "/mnt/nvme/Qwen3-Coder-480B-A35B-Instruct/") + +# Thresholds (conservative for 480B MoE on trn2.48xlarge) +TTFT_THRESHOLD_S = 5.0 # Max TTFT for short prompts (s) +THROUGHPUT_THRESHOLD = 5.0 # Min single-request throughput (tok/s) +CONCURRENT_TPS_THRESHOLD = 20 # Min aggregate TPS at 4 concurrent + + +def _check_server(): + """Check if vLLM server is running.""" + try: + resp = requests.get(f"{API_URL}/v1/models", timeout=5) + return resp.status_code == 200 + except Exception: + return False + + +@pytest.fixture(scope="module", autouse=True) +def check_server(): + """Skip all tests if server is not running.""" + if not _check_server(): + pytest.skip(f"vLLM server not running at {API_URL}") + + +@pytest.fixture(scope="module") +def warmup(): + """Warmup request to ensure model is ready.""" + resp = requests.post( + f"{API_URL}/v1/completions", + json={"model": MODEL, "prompt": "Hello", "max_tokens": 16, "temperature": 0.1}, + timeout=120, + ) + assert resp.status_code == 200, f"Warmup failed: {resp.text}" + return resp.json() + + +def test_smoke(warmup): + """Test that server responds and model generates output.""" + data = warmup + assert "choices" in data + assert len(data["choices"]) > 0 + assert len(data["choices"][0]["text"]) > 0 + print(f" Smoke test: {data['usage']['completion_tokens']} tokens generated") + + +def test_generation_quality_fibonacci(warmup): + """Test that model generates correct Fibonacci code.""" + resp = requests.post( + f"{API_URL}/v1/completions", + json={ + "model": MODEL, + "prompt": "Write a Python function to compute the nth Fibonacci number using memoization:\n```python\n", + "max_tokens": 256, + "temperature": 0.0, + }, + timeout=120, + ) + assert resp.status_code == 200 + text = resp.json()["choices"][0]["text"] + + # Should contain function definition and memoization-related keywords + assert "def " in text or "fibonacci" in text.lower(), ( + f"No function definition found: {text[:200]}" + ) + # Should not be garbage (repetitive single chars) + unique_chars = len(set(text.strip())) + assert unique_chars > 5, ( + f"Output appears to be garbage (only {unique_chars} unique chars)" + ) + print(f" Fibonacci: {len(text)} chars, coherent") + + +def test_generation_quality_math(warmup): + """Test basic math reasoning.""" + resp = requests.post( + f"{API_URL}/v1/completions", + json={ + "model": MODEL, + "prompt": "What is 2 + 2? Answer with just the number:", + "max_tokens": 8, + "temperature": 0.0, + }, + timeout=120, + ) + assert resp.status_code == 200 + text = resp.json()["choices"][0]["text"].strip() + assert "4" in text, f"Expected '4' in response, got: '{text}'" + print(f" Math: '{text}'") + + +def test_generation_quality_translation(warmup): + """Test translation capability.""" + resp = requests.post( + f"{API_URL}/v1/completions", + json={ + "model": MODEL, + "prompt": "Translate to French: 'The cat sat on the mat.'", + "max_tokens": 32, + "temperature": 0.0, + }, + timeout=120, + ) + assert resp.status_code == 200 + text = resp.json()["choices"][0]["text"].strip().lower() + # Should contain French words + french_indicators = ["le", "la", "chat", "sur", "tapis", "assis", "mat"] + has_french = any(word in text for word in french_indicators) + assert has_french, f"No French detected in: '{text}'" + print(f" Translation: '{text[:100]}'") + + +def test_ttft_short_prompt(warmup): + """Test TTFT for a short prompt (should use small bucket with auto-bucketing).""" + payload = { + "model": MODEL, + "prompt": "Hello world", + "max_tokens": 32, + "temperature": 0.1, + "stream": True, + } + start = time.time() + ttft = None + resp = requests.post( + f"{API_URL}/v1/completions", json=payload, timeout=120, stream=True + ) + for line in resp.iter_lines(): + if line: + decoded = line.decode("utf-8") + if decoded.startswith("data: ") and decoded != "data: [DONE]": + if ttft is None: + ttft = time.time() - start + break + + assert ttft is not None, "No streaming tokens received" + assert ttft < TTFT_THRESHOLD_S, ( + f"TTFT {ttft:.2f}s exceeds {TTFT_THRESHOLD_S}s threshold" + ) + print(f" TTFT (short prompt): {ttft:.3f}s (threshold: {TTFT_THRESHOLD_S}s)") + + +def test_throughput_single_request(warmup): + """Test single-request decode throughput.""" + start = time.time() + resp = requests.post( + f"{API_URL}/v1/completions", + json={ + "model": MODEL, + "prompt": "Write a Python hello world program", + "max_tokens": 128, + "temperature": 0.1, + }, + timeout=120, + ) + elapsed = time.time() - start + + assert resp.status_code == 200 + tokens = resp.json()["usage"]["completion_tokens"] + tps = tokens / elapsed if elapsed > 0 else 0 + + assert tps > THROUGHPUT_THRESHOLD, ( + f"Throughput {tps:.2f} tok/s below {THROUGHPUT_THRESHOLD} tok/s threshold" + ) + print(f" Throughput: {tps:.2f} tok/s ({tokens} tokens in {elapsed:.2f}s)") + + +def test_concurrent_throughput(warmup): + """Test aggregate throughput with 4 concurrent requests.""" + prompts = [ + "Write a Python function to compute Fibonacci numbers.", + "Explain what tensor parallelism is in 3 sentences.", + "Write a hello world program in Rust.", + "What is 127 * 389? Show your work.", + ] + results = [None] * len(prompts) + + def send(idx, prompt): + try: + data = json.dumps( + { + "model": MODEL, + "messages": [{"role": "user", "content": prompt}], + "max_tokens": 256, + "temperature": 0.6, + } + ).encode() + import urllib.request + + req = urllib.request.Request( + f"{API_URL}/v1/chat/completions", + data=data, + headers={"Content-Type": "application/json"}, + ) + with urllib.request.urlopen(req, timeout=300) as r: + result = json.loads(r.read().decode()) + results[idx] = { + "completion_tokens": result["usage"]["completion_tokens"], + "ok": len(result["choices"][0]["message"]["content"].strip()) > 10, + } + except Exception as e: + results[idx] = {"error": str(e)} + + wall_start = time.time() + threads = [ + threading.Thread(target=send, args=(i, p)) for i, p in enumerate(prompts) + ] + for t in threads: + t.start() + for t in threads: + t.join(timeout=300) + wall_elapsed = time.time() - wall_start + + valid = [r for r in results if r and "error" not in r] + total_tokens = sum(r["completion_tokens"] for r in valid) + aggregate_tps = total_tokens / wall_elapsed if wall_elapsed > 0 else 0 + all_ok = all(r.get("ok", False) for r in valid) + + assert len(valid) == len(prompts), ( + f"Only {len(valid)}/{len(prompts)} requests succeeded" + ) + assert all_ok, "Some responses had quality issues" + assert aggregate_tps > CONCURRENT_TPS_THRESHOLD, ( + f"Aggregate TPS {aggregate_tps:.2f} below {CONCURRENT_TPS_THRESHOLD} threshold" + ) + print( + f" Concurrent (4): {aggregate_tps:.2f} agg tok/s, {total_tokens} tokens, " + f"{wall_elapsed:.2f}s wall time" + ) + + +def test_chat_completions_api(warmup): + """Test the chat completions API endpoint.""" + resp = requests.post( + f"{API_URL}/v1/chat/completions", + json={ + "model": MODEL, + "messages": [{"role": "user", "content": "What is Python?"}], + "max_tokens": 64, + "temperature": 0.6, + }, + timeout=120, + ) + assert resp.status_code == 200 + data = resp.json() + assert "choices" in data + content = data["choices"][0]["message"]["content"] + assert len(content.strip()) > 10, f"Response too short: '{content}'" + print(f" Chat API: {data['usage']['completion_tokens']} tokens") + + +if __name__ == "__main__": + print("=" * 70) + print("Qwen3-Coder-480B-A35B-Instruct Integration Tests") + print(f"Server: {API_URL}") + print(f"Model: {MODEL}") + print("=" * 70) + + if not _check_server(): + print(f"\nERROR: vLLM server not running at {API_URL}") + print("Start the server first: ./src/qwen3_coder_vllm.sh") + sys.exit(1) + + # Warmup + print("\n--- Warmup ---") + resp = requests.post( + f"{API_URL}/v1/completions", + json={"model": MODEL, "prompt": "Hello", "max_tokens": 16, "temperature": 0.1}, + timeout=120, + ) + assert resp.status_code == 200, f"Warmup failed: {resp.text}" + warmup_data = resp.json() + print(f" OK: {warmup_data['usage']['completion_tokens']} tokens") + + # Run tests + tests = [ + ("Smoke Test", lambda: test_smoke(warmup_data)), + ("Fibonacci Quality", lambda: test_generation_quality_fibonacci(warmup_data)), + ("Math Quality", lambda: test_generation_quality_math(warmup_data)), + ( + "Translation Quality", + lambda: test_generation_quality_translation(warmup_data), + ), + ("TTFT (short prompt)", lambda: test_ttft_short_prompt(warmup_data)), + ( + "Single-Request Throughput", + lambda: test_throughput_single_request(warmup_data), + ), + ("Concurrent Throughput (4x)", lambda: test_concurrent_throughput(warmup_data)), + ("Chat Completions API", lambda: test_chat_completions_api(warmup_data)), + ] + + passed = 0 + failed = 0 + for name, test_fn in tests: + print(f"\n--- {name} ---") + try: + test_fn() + passed += 1 + except AssertionError as e: + print(f" FAIL: {e}") + failed += 1 + except Exception as e: + print(f" ERROR: {e}") + failed += 1 + + print(f"\n{'=' * 70}") + print(f"Results: {passed} passed, {failed} failed") + print(f"{'=' * 70}") + sys.exit(0 if failed == 0 else 1) diff --git a/contrib/models/Qwen3-Coder-480B-A35B-Instruct/test/unit/__init__.py b/contrib/models/Qwen3-Coder-480B-A35B-Instruct/test/unit/__init__.py new file mode 100644 index 00000000..e69de29b