forked from fuergaosi233/claude-code-proxy
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_cancellation.py
More file actions
130 lines (109 loc) Β· 4.76 KB
/
test_cancellation.py
File metadata and controls
130 lines (109 loc) Β· 4.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python3
"""
Test script for HTTP request cancellation functionality.
This script demonstrates how client disconnection cancels ongoing requests.
"""
import asyncio
import httpx
import json
import time
async def test_non_streaming_cancellation():
"""Test cancellation for non-streaming requests."""
print("π§ͺ Testing non-streaming request cancellation...")
async with httpx.AsyncClient(timeout=30) as client:
try:
# Start a long-running request
task = asyncio.create_task(
client.post(
"http://localhost:8082/v1/messages",
json={
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 1000,
"messages": [
{"role": "user", "content": "Write a very long story about a journey through space that takes at least 500 words."}
]
}
)
)
# Cancel after 2 seconds
await asyncio.sleep(2)
task.cancel()
try:
await task
print("β Request should have been cancelled")
except asyncio.CancelledError:
print("β
Non-streaming request cancelled successfully")
except Exception as e:
print(f"β Non-streaming test error: {e}")
async def test_streaming_cancellation():
"""Test cancellation for streaming requests."""
print("\nπ§ͺ Testing streaming request cancellation...")
async with httpx.AsyncClient(timeout=30) as client:
try:
# Start streaming request
async with client.stream(
"POST",
"http://localhost:8082/v1/messages",
json={
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 1000,
"messages": [
{"role": "user", "content": "Write a very long story about a journey through space that takes at least 500 words."}
],
"stream": True
}
) as response:
if response.status_code == 200:
print("β
Streaming request started successfully")
# Read a few chunks then simulate client disconnect
chunk_count = 0
async for line in response.aiter_lines():
if line.strip():
chunk_count += 1
print(f"π¦ Received chunk {chunk_count}: {line[:100]}...")
# Simulate client disconnect after 3 chunks
if chunk_count >= 3:
print("π Simulating client disconnect...")
break
print("β
Streaming request cancelled successfully")
else:
print(f"β Streaming request failed: {response.status_code}")
except Exception as e:
print(f"β Streaming test error: {e}")
async def test_server_running():
"""Test if the server is running."""
print("π Checking if server is running...")
try:
async with httpx.AsyncClient(timeout=5) as client:
response = await client.get("http://localhost:8082/health")
if response.status_code == 200:
print("β
Server is running and healthy")
return True
else:
print(f"β Server health check failed: {response.status_code}")
return False
except Exception as e:
print(f"β Cannot connect to server: {e}")
print("π‘ Make sure to start the server with: python start_proxy.py")
return False
async def main():
"""Main test function."""
print("π Starting HTTP request cancellation tests")
print("=" * 50)
# Check if server is running
if not await test_server_running():
return
print("\n" + "=" * 50)
# Test non-streaming cancellation
await test_non_streaming_cancellation()
# Test streaming cancellation
await test_streaming_cancellation()
print("\n" + "=" * 50)
print("β
All cancellation tests completed!")
print("\nπ‘ Note: The actual cancellation behavior depends on:")
print(" - Client implementation (httpx in this case)")
print(" - Network conditions")
print(" - Server response to client disconnection")
print(" - Whether the underlying OpenAI API supports cancellation")
if __name__ == "__main__":
asyncio.run(main())