Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 22 additions & 3 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ ci-setup-macos: ensure-rust ensure-uv
ci-setup-fedora python_version="3.12": ensure-uv
#!/usr/bin/env bash
export PATH="$HOME/.local/bin:$PATH"
dnf install -y python{{python_version}}
# Install build dependencies
dnf install -y gcc gcc-c++ openssl-devel
# Use uv to install Python (consistent with manylinux setup)
uv python install {{python_version}}
uv tool install pytest
echo "==> Setup complete!"

Expand Down Expand Up @@ -193,8 +196,24 @@ ci-build manylinux="":
ci-test:
#!/usr/bin/env bash
export PATH="$HOME/.local/bin:$PATH"
pip install dist/*.whl pytest pytest-asyncio 2>/dev/null || uv pip install --system dist/*.whl pytest pytest-asyncio
python3 -m pytest tests/python -v
# Install wheel and dependencies using uv (preferred) or pip
if command -v uv &> /dev/null; then
uv pip install --system dist/*.whl pytest pytest-asyncio
# Use uv run pytest (uses uv-managed Python environment)
uv run pytest tests/python -v
else
# Fallback to pip if uv not available
pip install dist/*.whl pytest pytest-asyncio
# Try to find python executable
for py in python3 python3.12 python3.11 python3.10 python; do
if command -v $py &> /dev/null; then
$py -m pytest tests/python -v
exit 0
fi
done
echo "Error: No Python interpreter found"
exit 1
fi

# =============================================================================
# 本地模拟 CI 流水线 (Action 命令)
Expand Down
126 changes: 74 additions & 52 deletions benchmarks/PERFORMANCE_COMPARISON_REPORT.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@

本报告对比了两个分布式 Actor 框架——**Ray** 和 **Pulsing**——在相同负载下的性能表现。

**核心发现**(基于单进程公平对比):
**核心发现**(基于单进程公平对比,Ray 使用 Generators):

| 指标 | Pulsing 优势 |
|------|-------------|
| 单请求平均延迟 | **快 100 倍**(2.65ms vs 264.74ms) |
| 单请求 P99 延迟 | **快 319 倍**(22ms vs 7,083ms) |
| 流式 P99 延迟 | **快 10.8 倍**(976ms vs 10,548ms) |
| 吞吐量 | **高 2.7 倍**(1,446 vs 530 请求) |
| 单请求平均延迟 | **快 467 倍**(1.41ms vs 659.30ms) |
| 单请求 P99 延迟 | **快 3,415 倍**(3.85ms vs 13,156ms) |
| 流式平均延迟 | **快 9.3 倍**(112.70ms vs 1,044.85ms) |
| 流式 P99 延迟 | **快 91 倍**(175ms vs 15,949ms) |
| 总吞吐量 | **高 17.8 倍**(6,715 vs 378 操作) |

**结论**:Pulsing 在低延迟、高吞吐场景下显著优于 Ray,适合实时推理服务等延迟敏感型应用。
**结论**:即使 Ray 使用 Generators 实现流式处理,Pulsing 在低延迟、高吞吐场景下仍然显著优于 Ray,适合实时推理服务等延迟敏感型应用。

---

Expand Down Expand Up @@ -82,25 +83,27 @@ result = await actor.echo.remote("hello") # 返回 ObjectRef,自动解包

### 1.4 流式处理对比

| 维度 | Pulsing | Ray |
|------|---------|-----|
| **实现方式** | `StreamMessage` + `StreamReader` | 返回 `List[Dict]` |
| **数据传输** | 分块流式(边产出边消费) | 一次性返回完整列表 |
| **首字节时间** | 生成第一个 chunk 后即可接收 | 必须等待全部生成完毕 |
| **内存占用** | 仅缓存当前 chunk | 需缓存完整结果 |
| 维度 | Pulsing | Ray(修正后) |
|------|---------|--------------|
| **实现方式** | `StreamMessage` + `StreamReader` | Ray Generators(`yield`) |
| **数据传输** | 分块流式(边产出边消费) | 分块流式(使用 `async for`) |
| **首字节时间** | 生成第一个 chunk 后即可接收 | 生成第一个 chunk 后即可接收 |
| **内存占用** | 仅缓存当前 chunk | 仅缓存当前 chunk(ObjectRef) |

```
Pulsing 流式:
Producer: [chunk1] → [chunk2] → [chunk3] → ... → [done]
Consumer: ↓ ↓ ↓
处理1 处理2 处理3 (边收边处理)

Ray 列表返回
Producer: [chunk1, chunk2, chunk3, ...]全部完成后一次性返回
Consumer:
一次性接收全部
Ray Generators(修正后)
Producer: [chunk1] → [chunk2] → [chunk3] → ... → [done]
Consumer:
处理1 处理2 处理3 (边收边处理)
```

**注意**:修正后的 Ray benchmark 使用 Ray Generators 实现真正的流式处理,与 Pulsing 的流式语义等价。

---

## 2. 关键设计差异
Expand All @@ -116,14 +119,16 @@ Ray 列表返回:

**影响**:Pulsing 的调用路径更短,单请求延迟显著更低。

### 2.2 差异 B:流式语义
### 2.2 差异 B:流式语义(已修正)

| 场景 | Pulsing | Ray |
|------|---------|-----|
| 生成 10 个 item,每个延迟 50ms | TTFT ≈ 50ms,总延迟 ≈ 500ms | 总延迟 ≈ 500ms(无法提前获取) |
| P99 尾延迟 | 较低(流式分摊) | 较高(必须等待全部完成) |
| 场景 | Pulsing | Ray(修正后) |
|------|---------|--------------|
| 生成 10 个 item,每个延迟 50ms | TTFT ≈ 50ms,总延迟 ≈ 500ms | TTFT ≈ 50ms,总延迟 ≈ 500ms |
| P99 尾延迟 | 较低(175ms) | 较高(15,949ms) |

**影响**:在 LLM 推理等场景,Pulsing 可以实现更好的用户体验(首 token 更快到达)。
**影响**:
- 虽然两者都实现了真正的流式处理,但 Ray 的底层架构(Object Store + 序列化)导致延迟和长尾问题更严重
- 在 LLM 推理等场景,Pulsing 可以实现更好的用户体验(更低的延迟和更稳定的 P99)

### 2.3 差异 C:运行时模型

Expand Down Expand Up @@ -189,38 +194,41 @@ Ray 列表返回:
### 4.1 单进程模式(公平对比)✅

> **测试条件**:30秒,100 req/s,50 Workers/类型,单进程
>
> **重要更新**:Ray benchmark 已修正为使用 Ray Generators 实现真正的流式处理,确保公平对比。

#### 单请求性能

| 指标 | Ray | Pulsing | Pulsing 优势 |
|------|----:|--------:|-------------:|
| 总请求数 | 530 | 1,446 | **2.7×** |
| 总请求数 | 254 | 4,734 | **18.6×** |
| 成功率 | 100% | 100% | — |
| 平均延迟 | 264.74 ms | 2.65 ms | **100× 更低** |
| P50 延迟 | 14.62 ms | 0.99 ms | **15× 更低** |
| P95 延迟 | 328.78 ms | 11.12 ms | **30× 更低** |
| P99 延迟 | 7,083.10 ms | 22.19 ms | **319× 更低** |
| 平均延迟 | 659.30 ms | 1.41 ms | **467× 更低** |
| P50 延迟 | 265.43 ms | 1.23 ms | **216× 更低** |
| P95 延迟 | 1,764.99 ms | 3.00 ms | **588× 更低** |
| P99 延迟 | 13,156.18 ms | 3.85 ms | **3,415× 更低** |

**分析**:
- Ray 的 P99 延迟高达 7 秒,说明存在严重的长尾问题,可能与 Object Store 争用或 GC 相关
- Pulsing 的 P99 仅 22ms,延迟分布非常稳定
- 相同时间内 Pulsing 处理的请求数是 Ray 的 2.7 倍
- Ray 的 P99 延迟高达 13 秒,说明存在严重的长尾问题,可能与 Object Store 争用、序列化开销或调度延迟相关
- Pulsing 的 P99 仅 3.85ms,延迟分布非常稳定,几乎无长尾
- 相同时间内 Pulsing 处理的请求数是 Ray 的 18.6 倍,吞吐量优势显著

#### 流式性能
#### 流式性能(使用 Ray Generators)

| 指标 | Ray | Pulsing | Pulsing 优势 |
|------|----:|--------:|-------------:|
| 总流数 | 252 | 654 | **2.6×** |
| 总流数 | 124 | 1,981 | **16.0×** |
| 成功率 | 100% | 100% | — |
| 平均延迟 | 605.00 ms | 420.01 ms | **30% 更低** |
| P50 延迟 | 424.99 ms | 370.06 ms | **13% 更低** |
| P95 延迟 | 914.60 ms | 874.89 ms | 略低 |
| P99 延迟 | 10,547.73 ms | 975.80 ms | **10.8× 更低** |
| 平均延迟 | 1,044.85 ms | 112.70 ms | **9.3× 更低** |
| P50 延迟 | 385.90 ms | 112.21 ms | **3.4× 更低** |
| P95 延迟 | 3,588.20 ms | 168.56 ms | **21.3× 更低** |
| P99 延迟 | 15,949.15 ms | 175.00 ms | **91× 更低** |

**分析**:
- Ray 的流式 P99 超过 10 秒,严重影响用户体验
- Pulsing 流式 P99 控制在 1 秒内,更适合实时场景
- 差异主要来自流式语义不同:Pulsing 真流式 vs Ray 列表返回
- 即使使用 Ray Generators 实现流式处理,Ray 的流式 P99 仍超过 15 秒,严重影响用户体验
- Pulsing 流式 P99 控制在 175ms 内,更适合实时场景
- 虽然两者都实现了真正的流式处理,但 Pulsing 的延迟和吞吐量仍然显著优于 Ray
- 差异主要来自底层架构:Pulsing 的直接消息传递 vs Ray 的 Object Store + 序列化开销

---

Expand Down Expand Up @@ -261,24 +269,24 @@ Ray 列表返回:

## 5. 结论

### 5.1 性能对比总结
### 5.1 性能对比总结(修正后,Ray 使用 Generators)

| 维度 | Ray | Pulsing | 差异倍数 |
|------|----:|--------:|---------:|
| 单请求平均延迟 | 264.74 ms | 2.65 ms | **100×** |
| 单请求 P99 延迟 | 7,083 ms | 22 ms | **319×** |
| 流式平均延迟 | 605 ms | 420 ms | **1.4×** |
| 流式 P99 延迟 | 10,548 ms | 976 ms | **10.8×** |
| 吞吐量(单请求) | 530 | 1,446 | **2.7×** |
| 单请求平均延迟 | 659.30 ms | 1.41 ms | **467×** |
| 单请求 P99 延迟 | 13,156 ms | 3.85 ms | **3,415×** |
| 流式平均延迟 | 1,044.85 ms | 112.70 ms | **9.3×** |
| 流式 P99 延迟 | 15,949 ms | 175 ms | **91×** |
| 总吞吐量(请求+流) | 378 | 6,715 | **17.8×** |

### 5.2 差异归因

| 差异 | 原因 |
|------|------|
| 单请求延迟 100× | Pulsing 直接消息传递 vs Ray Object Store + Raylet 调度 |
| P99 尾延迟巨大 | Ray 的 Object Store GC 和调度争用导致长尾 |
| 流式延迟差异 | Pulsing 真流式(TTFT 更早)vs Ray 一次性返回 |
| 吞吐量差异 | Pulsing 更低的调用开销支持更高并发 |
| 单请求延迟 467× | Pulsing 直接消息传递(JSON 序列化)vs Ray Object Store + Pickle 序列化 + Raylet 调度 |
| P99 尾延迟巨大(3,415×) | Ray 的 Object Store GC、序列化开销和调度争用导致严重长尾 |
| 流式延迟差异(9.3×) | 虽然都使用流式处理,但 Pulsing 的消息传递开销远低于 Ray 的 ObjectRef 机制 |
| 吞吐量差异(17.8×) | Pulsing 更低的调用开销和更高效的并发模型支持更高吞吐量 |

### 5.3 适用场景建议

Expand All @@ -293,9 +301,14 @@ Ray 列表返回:

### 5.4 最终结论

> **Pulsing 在延迟敏感型场景下显著优于 Ray**,单请求延迟快 100 倍,P99 延迟快 319 倍。
> **即使 Ray 使用 Generators 实现流式处理,Pulsing 在延迟敏感型场景下仍然显著优于 Ray**:
> - 单请求延迟快 **467 倍**(1.41ms vs 659.30ms)
> - 单请求 P99 延迟快 **3,415 倍**(3.85ms vs 13,156ms)
> - 流式平均延迟快 **9.3 倍**(112.70ms vs 1,044.85ms)
> - 流式 P99 延迟快 **91 倍**(175ms vs 15,949ms)
> - 总吞吐量高 **17.8 倍**(6,715 vs 378 操作)
>
> 对于需要低延迟、高吞吐的 Actor 系统(如推理服务、实时 API),**推荐使用 Pulsing**。
> 对于需要低延迟、高吞吐的 Actor 系统(如推理服务、实时 API),**强烈推荐使用 Pulsing**。
>
> 对于需要丰富生态和复杂调度的大规模数据处理任务,**Ray 仍是更好的选择**。

Expand Down Expand Up @@ -327,7 +340,16 @@ DURATION=60 RATE=200 NUM_WORKERS=100 ./benchmarks/run_stress_test_ray_single.sh

| 脚本 | 说明 |
|------|------|
| `large_scale_stress_test_ray_single.py` | Ray 单进程测试 |
| `large_scale_stress_test_ray_single.py` | Ray 单进程测试(已修正:使用 Ray Generators) |
| `large_scale_stress_test_pulsing_single.py` | Pulsing 单进程测试 |
| `large_scale_stress_test_ray.py` | Ray 多进程测试(torchrun) |
| `large_scale_stress_test.py` | Pulsing 多进程测试(torchrun) |

### D. 测试修正说明

**Ray benchmark 修正**(2025-01-25):
- ✅ 修正 `StreamWorker` 使用 Ray Generators(`yield`)实现真正的流式处理
- ✅ 修正调用端使用 `async for` 配合 Ray Generators 消费流式结果
- ✅ 确保参数与 Pulsing benchmark 对齐(count: 5-15, delay: 0.01)

修正后的测试结果更能反映两个框架的真实性能差异,确保公平对比。
38 changes: 20 additions & 18 deletions benchmarks/large_scale_stress_test_ray_single.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#!/usr/bin/env python3
"""
Ray Stress Test Script - Single Process Version (Correct Ray Usage)
Ray Stress Test Script - Single Process Version (Correct Ray Usage with Generators)

Ray is designed as a single driver process + multiple Actors, should not use torchrun multi-process mode.
This script creates multiple Actors within a single process, simulating equivalent load to Pulsing.

This version uses Ray Generators for streaming, providing fair comparison with Pulsing's streaming.

Usage:
python benchmarks/large_scale_stress_test_ray_single.py \
--duration 300 \
Expand Down Expand Up @@ -150,20 +152,17 @@

@ray.remote
class StreamWorker:
"""Stream Worker - Streamed response"""
"""Stream Worker - Streamed response using Ray Generators"""

async def generate_stream(self, count: int, delay: float) -> list[dict]:
result = []
async def generate_stream(self, count: int, delay: float):
"""Generate stream using yield (Ray Generator)"""
for i in range(count):
result.append(
{
"index": i,
"value": f"item_{i}",
"timestamp": time.time(),
}
)
await asyncio.sleep(delay)
return result
yield {
"index": i,
"value": f"item_{i}",
"timestamp": time.time(),
}


@ray.remote
Expand Down Expand Up @@ -268,22 +267,25 @@
return False

async def send_stream_request(self) -> bool:
"""Send a stream request"""
"""Send a stream request using Ray Generators (async for)"""
if "stream" not in self.workers or not self.workers["stream"]:
return False

worker = random.choice(self.workers["stream"])
start_time = time.time()

try:
count = random.randint(5, 20)
delay = random.uniform(0.01, 0.05)

stream_items = await worker.generate_stream.remote(count, delay)
count = random.randint(5, 15)
delay = 0.01

# Use async for to stream results from Ray Generator
# This is the correct way to consume Ray Generators in asyncio
chunk_count = 0
for _ in stream_items:
async for ref in worker.generate_stream.remote(count, delay):
# await the ObjectRef to get the actual value
item = await ref

Check notice

Code scanning / CodeQL

Unused local variable Note test

Variable item is not used.

Copilot Autofix

AI 23 days ago

In general, when a local variable is assigned but never used and the right-hand side has no side effects, either remove the variable or rename it to clearly indicate that it is intentionally unused. Here, the assignment item = await ref both awaits the ObjectRef (which is required to realize the stream elements and surface any exceptions) and binds the result to item. The await itself is needed, but the bound name is not. To preserve behavior while satisfying the unused-variable rule and keeping the code illustrative, we should keep the await ref call but bind it to a deliberately unused name.

The best minimal fix is to rename item on line 286 to something like _item (which contains unused-style semantics via the leading underscore convention) so that CodeQL recognizes it as intentionally unused, without altering any other logic. Concretely, in benchmarks/large_scale_stress_test_ray_single.py, inside StressTestClient.send_stream_request, change the line

item = await ref

to

_item = await ref

No imports or additional definitions are needed for this change.

Suggested changeset 1
benchmarks/large_scale_stress_test_ray_single.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/benchmarks/large_scale_stress_test_ray_single.py b/benchmarks/large_scale_stress_test_ray_single.py
--- a/benchmarks/large_scale_stress_test_ray_single.py
+++ b/benchmarks/large_scale_stress_test_ray_single.py
@@ -283,7 +283,7 @@
             chunk_count = 0
             async for ref in worker.generate_stream.remote(count, delay):
                 # await the ObjectRef to get the actual value
-                item = await ref
+                _item = await ref
                 chunk_count += 1
                 # Process item if needed (currently just counting)
 
EOF
@@ -283,7 +283,7 @@
chunk_count = 0
async for ref in worker.generate_stream.remote(count, delay):
# await the ObjectRef to get the actual value
item = await ref
_item = await ref
chunk_count += 1
# Process item if needed (currently just counting)

Copilot is powered by AI and may make mistakes. Always verify output.
chunk_count += 1
# Process item if needed (currently just counting)

latency_ms = (time.time() - start_time) * 1000
self.stats.add_stream(True, latency_ms)
Expand Down
Loading
Loading