-
Notifications
You must be signed in to change notification settings - Fork 654
Improve proxy server #4354
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Improve proxy server #4354
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR refactors the proxy server implementation by extracting node management into a dedicated module, introducing shared aiohttp session management, and simplifying the proxy request handling flow for Hybrid vs DistServe serving modes.
Changes:
- Split proxy node management logic into a new
NodeManager/Nodeimplementation with a shared aiohttpClientSession. - Refactored proxy request handling into helper functions for Hybrid and DistServe flows (prefill/decode + PD connection).
- Renamed OpenAI API server context from
VariableInterfacetoServerContextand updated type hints/references accordingly.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
lmdeploy/serve/proxy/proxy.py |
Refactors proxy request routing and DistServe flow; introduces AppSettings and new node API models. |
lmdeploy/serve/proxy/node_manager.py |
New module implementing shared aiohttp connector/session, Node, and NodeManager with routing strategies. |
lmdeploy/serve/openai/api_server.py |
Renames the global server context class to ServerContext and updates usages. |
lmdeploy/serve/openai/serving_generate.py |
Updates check_request typing to use ServerContext. |
lmdeploy/serve/openai/serving_completion.py |
Updates check_request typing to use ServerContext. |
lmdeploy/serve/openai/serving_chat_completion.py |
Updates check_request typing to use ServerContext. |
Comments suppressed due to low confidence (1)
lmdeploy/serve/openai/api_server.py:59
- Renaming
VariableInterfacetoServerContextis a breaking change for in-repo callers (e.g., tests/imports) and any downstream code importingVariableInterface. Consider providing a backwards-compatible alias (VariableInterface = ServerContext) with deprecation messaging to avoid breaking existing imports.
class ServerContext:
"""A IO interface maintaining variables."""
async_engine: AsyncEngine = None
api_keys: Optional[List[str]] = None
request_hosts = []
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if api_keys is not None: | ||
| if isinstance(api_keys, str): | ||
| api_keys = api_keys.split(',') | ||
| from lmdeploy.serve.openai.api_server import VariableInterface | ||
| VariableInterface.api_keys = api_keys | ||
| app_settings.api_keys = api_keys |
Copilot
AI
Feb 12, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
proxy() stores API keys in app_settings.api_keys, but the check_api_key dependency reads ServerContext.api_keys (from lmdeploy.serve.openai.api_server). As a result, proxy API key protection will be effectively disabled even when api_keys is provided. Set the keys on ServerContext.api_keys (or implement a proxy-local check_api_key that uses app_settings.api_keys).
| if node.status is None: | ||
| from lmdeploy.serve.openai.api_client import APIClient | ||
| node.status = Status(models=APIClient(api_server_url=node.url).available_models) | ||
| await node_manager.add(node.url, node.status) |
Copilot
AI
Feb 12, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add_node is an async endpoint but uses the synchronous APIClient (requests) to fetch /v1/models, which will block the event loop under load. Consider moving this call into a threadpool (run_in_threadpool) or switching to an async client (reusing the shared aiohttp session) to avoid proxy latency spikes.
| async def terminate_node_all(): | ||
| """Terminate all nodes.""" | ||
| try: | ||
| success = node_manager.terminate_all_nodes() | ||
| if not success: | ||
| return 'Failed to terminate all nodes' | ||
| return 'All nodes terminated successfully' | ||
| except: # noqa | ||
| logger.error('Failed to terminate all nodes') | ||
| return 'Failed to terminate all nodes.' | ||
| await node_manager.terminate_all_nodes() | ||
| return JSONResponse(status_code=200, content={'message': 'All nodes terminated successfully'}) |
Copilot
AI
Feb 12, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/nodes/terminate_all terminates nodes but does not deregister endpoints from pd_connection_pool. This can leave stale decode/prefill endpoints and connections behind. Consider deregistering each terminated node URL (or resetting the pool) after successful termination.
| async def terminate_node(self, node_url: str): | ||
| """Terminate a node.""" | ||
| if node_url not in self.nodes: | ||
| raise KeyError(f'Node {node_url} does not exist') | ||
|
|
||
| self.nodes.pop(node_url) | ||
| self._invalidate_nodes_cache() | ||
|
|
||
| session = await connector.get_session() | ||
| async with session.get(f'{node_url}/terminate', headers={'accept': 'application/json'}) as response: | ||
| if response.status != 200: | ||
| text = await response.text() | ||
| raise RuntimeError(f'Failed to terminate node {node_url}, status={response.status}, msg={text}') |
Copilot
AI
Feb 12, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
terminate_node() pops the node and invalidates the cache but never calls connector.update(len(self.nodes)) (unlike add/remove). This means aiohttp connection limits may remain sized for the old host count. Update the connector limits after node removal/termination for consistent behavior.
| def remove_stale_nodes_by_expiration(self): | ||
| """Remove stale nodes.""" | ||
| headers = {'accept': 'application/json'} | ||
| to_be_deleted = [url for url in self.nodes.keys() if not self._check_node_health(url, headers)] | ||
| for node_url in to_be_deleted: | ||
| # Note: remove is async but we can't await here in sync method | ||
| # The node will be removed from dict, but async cleanup won't happen | ||
| if node_url in self.nodes: | ||
| self.nodes.pop(node_url) | ||
| self._invalidate_nodes_cache() | ||
| logger.info(f'Removed node {node_url} due to heart beat expiration') |
Copilot
AI
Feb 12, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove_stale_nodes_by_expiration() mutates self.nodes but skips connector.update() (and any other async cleanup) due to being sync. This can leave the shared session sized for removed hosts and may keep stale resources around. Consider scheduling connector.update() onto the running event loop (e.g., via asyncio.run_coroutine_threadsafe) or making the heartbeat coroutine-based so cleanup can be awaited.
| # Cleanup | ||
| if not app_settings.dummy_prefill: | ||
| prefill_id = prefill_info.get('id') | ||
| if prefill_id: | ||
| app_settings.pd_connection_pool.unshelf_prefill_session((p_url, d_node.url), prefill_id) |
Copilot
AI
Feb 12, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unshelf_prefill_session() is called during cleanup even when handle_shelf is false / no shelf_prefill_session() was performed. PDConnectionPool.unshelf_prefill_session uses set.remove() and will raise KeyError, which can break DistServe chat requests. Only unshelf if the session was actually shelved (e.g., guard with handle_shelf or track whether shelving happened).
|
|
||
| def report_model_not_found(content: str): | ||
| """Report model not found error.""" | ||
| return JSONResponse(status_code=404, content={'error_code': ErrorCodes.MODEL_NOT_FOUND.value, 'error_msg': content}) |
Copilot
AI
Feb 12, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
report_model_not_found returns {error_code, error_msg}, while other proxy errors (e.g., API timeout) use {error_code, text} based on err_msg. This inconsistency is likely to break clients that parse the proxy error shape. Consider standardizing on the existing text field (or updating all error paths consistently).
| return JSONResponse(status_code=404, content={'error_code': ErrorCodes.MODEL_NOT_FOUND.value, 'error_msg': content}) | |
| return JSONResponse( | |
| status_code=404, | |
| content={'error_code': ErrorCodes.MODEL_NOT_FOUND.value, 'text': content}, | |
| ) |
| async def terminate_node(node: NodeModel): | ||
| """Terminate a node.""" | ||
| try: | ||
| node_url = node.url | ||
| success = node_manager.terminate_node(node_url) | ||
| if not success: | ||
| return f'Failed to terminate node {node_url}' | ||
| return 'Terminated successfully' | ||
| except: # noqa | ||
| logger.error(f'Terminate node {node_url} failed.') | ||
| return 'Failed to terminate node {node_url}, please check the input url.' | ||
| await node_manager.terminate_node(node.url) | ||
| logger.info(f'Terminated node {node.url} successfully') | ||
| return JSONResponse(status_code=200, content={'message': 'Terminated successfully', 'url': node.url}) |
Copilot
AI
Feb 12, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/nodes/terminate removes a node but does not deregister it from pd_connection_pool (unlike /nodes/remove). This can leave stale endpoints/links in PDConnectionPool and affect later connection management. Consider calling app_settings.pd_connection_pool.dereg_instance(node.url) here as well.
Thanks for your contribution and we appreciate it a lot. The following instructions would make your pull request more healthy and more easily receiving feedbacks. If you do not understand some items, don't worry, just make the pull request and seek help from maintainers.
Motivation
Please describe the motivation of this PR and the goal you want to achieve through this PR.
Modification
Please briefly describe what modification is made in this PR.
BC-breaking (Optional)
Does the modification introduce changes that break the backward-compatibility of the downstream repositories?
If so, please describe how it breaks the compatibility and how the downstream projects should modify their code to keep compatibility with this PR.
Use cases (Optional)
If this PR introduces a new feature, it is better to list some use cases here, and update the documentation.
Checklist