Conversation
Summary of ChangesHello @hjh0119, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request updates the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request aims to add compatibility with vLLM 0.14 by updating swift/pipelines/infer/rollout.py. The main change is a local re-implementation of WeightSyncWorkerExtension to handle API changes in vLLM's distributed communication, including support for CUDA, XPU, and Ascend devices.
My review has identified several critical issues related to the new XPU implementation. The current code will fail on XPU systems due to incorrect device handling for tensor allocation and improper use of broadcast and barrier calls. I have provided specific code suggestions to fix these by adding the necessary conditional logic for XPU devices. Additionally, I've included a medium-severity suggestion to refactor a redundant check for improved code clarity.
| weight = torch.empty(shape, dtype=dtype, device=self.communicator.device) | ||
|
|
||
| # Use NCCL to broadcast the updated weights from the client (src) to all workers. | ||
| self._comm.broadcast(weight, src=self.client_rank) | ||
| self._comm.group.barrier() | ||
| self.communicator.broadcast(weight, src=self.client_rank) | ||
| self.communicator.group.barrier() |
There was a problem hiding this comment.
The current implementation for updating named parameters will fail on XPU devices. The self.communicator for XPU is a torch.distributed.ProcessGroup, which does not have a .device attribute, causing an AttributeError. Additionally, the broadcast and barrier calls are incorrect for this process group type.
You should add conditional logic to handle XPU devices correctly, similar to how it's done in recent versions of trl.
| weight = torch.empty(shape, dtype=dtype, device=self.communicator.device) | |
| # Use NCCL to broadcast the updated weights from the client (src) to all workers. | |
| self._comm.broadcast(weight, src=self.client_rank) | |
| self._comm.group.barrier() | |
| self.communicator.broadcast(weight, src=self.client_rank) | |
| self.communicator.group.barrier() | |
| device = self.device if is_torch_xpu_available() else self.communicator.device | |
| weight = torch.empty(shape, dtype=dtype, device=device) | |
| # Use NCCL to broadcast the updated weights from the client (src) to all workers. | |
| if is_torch_xpu_available(): | |
| torch.distributed.broadcast(tensor=weight, src=self.client_rank, group=self.communicator) | |
| self.communicator.barrier().wait() | |
| else: | |
| self.communicator.broadcast(weight, src=self.client_rank) | |
| self.communicator.group.barrier() |
| flatten_tensor = torch.empty(flatten_tensor_length, dtype=dtype, device=self.communicator.device) | ||
| self.communicator.broadcast(flatten_tensor, src=self.client_rank) | ||
| self.communicator.group.barrier() |
There was a problem hiding this comment.
Similar to update_named_param, this method has incorrect logic for XPU devices. self.communicator.device will raise an AttributeError, and the broadcast/barrier calls are incorrect for ProcessGroup. This will cause a runtime failure on XPU systems.
device = self.device if is_torch_xpu_available() else self.communicator.device
flatten_tensor = torch.empty(flatten_tensor_length, dtype=dtype, device=device)
if is_torch_xpu_available():
torch.distributed.broadcast(tensor=flatten_tensor, src=self.client_rank, group=self.communicator)
self.communicator.barrier().wait()
else:
self.communicator.broadcast(flatten_tensor, src=self.client_rank)
self.communicator.group.barrier()| tensor = torch.empty(shape, dtype=dtype, device=self.communicator.device) | ||
| self.communicator.broadcast(tensor, src=self.client_rank) |
There was a problem hiding this comment.
This part of the code has incorrect logic for XPU devices. The device for tensor allocation is wrong, and the broadcast call needs to be conditional to support the ProcessGroup used for XPU.
tensor = torch.empty(shape, dtype=dtype, device=self.device if is_torch_xpu_available() else self.communicator.device)
if is_torch_xpu_available():
torch.distributed.broadcast(tensor=tensor, src=self.client_rank, group=self.communicator)
else:
self.communicator.broadcast(tensor, src=self.client_rank)| named_params[name] = tensor | ||
|
|
||
| self._comm.group.barrier() | ||
| self.communicator.group.barrier() |
There was a problem hiding this comment.
| flatten_tensor = torch.empty(flatten_tensor_length, dtype=dtype, device=self.communicator.device) | ||
|
|
||
| self._comm.broadcast(flatten_tensor, src=self.client_rank) | ||
| self._comm.group.barrier() | ||
| self.communicator.broadcast(flatten_tensor, src=self.client_rank) | ||
| self.communicator.group.barrier() |
There was a problem hiding this comment.
This method has the same critical issue with XPU device handling as other update methods. self.communicator.device will fail, and the broadcast and barrier calls are incorrect for the ProcessGroup used in the XPU case.
device = self.device if is_torch_xpu_available() else self.communicator.device
flatten_tensor = torch.empty(flatten_tensor_length, dtype=dtype, device=device)
if is_torch_xpu_available():
torch.distributed.broadcast(tensor=flatten_tensor, src=self.client_rank, group=self.communicator)
self.communicator.barrier().wait()
else:
self.communicator.broadcast(flatten_tensor, src=self.client_rank)
self.communicator.group.barrier()
swift/pipelines/infer/rollout.py
Outdated
| if torch.cuda.is_available() or (is_torch_xpu_available() | ||
| and hasattr(torch.xpu.get_device_properties(self.device), 'uuid')): | ||
| accelerator_module = torch.xpu if is_torch_xpu_available() else torch.cuda |
There was a problem hiding this comment.
The check is_torch_xpu_available() is performed twice in this conditional block. You can simplify this logic by using a variable to store the result of the check, which improves readability and avoids redundancy.
| if torch.cuda.is_available() or (is_torch_xpu_available() | |
| and hasattr(torch.xpu.get_device_properties(self.device), 'uuid')): | |
| accelerator_module = torch.xpu if is_torch_xpu_available() else torch.cuda | |
| use_xpu_with_uuid = is_torch_xpu_available() and hasattr(torch.xpu.get_device_properties(self.device), 'uuid') | |
| if torch.cuda.is_available() or use_xpu_with_uuid: | |
| accelerator_module = torch.xpu if use_xpu_with_uuid else torch.cuda |
|
/gemini review |
There was a problem hiding this comment.
Code Review
The pull request updates the codebase to be compatible with vLLM 0.14, primarily by re-implementing the WeightSyncWorkerExtension class and adjusting related communication protocols. This involves removing the _check_trl_version method and the client_device_uuid field from communication requests, and updating the weight synchronization logic to use the new communicator attribute. The changes are well-aligned with the stated goal of vLLM 0.14 compatibility. However, there is a critical issue in the re-implemented WeightSyncWorkerExtension where self.device is accessed before being defined, which needs to be addressed.
| import torch_npu | ||
| torch_npu.npu.set_device(device) | ||
| else: | ||
| device = self.device |
There was a problem hiding this comment.
The device attribute is accessed via self.device but is not defined within the WeightSyncWorkerExtension class. This will lead to an AttributeError. It should likely be accessed from self.model_runner.device, as model_runner is an attribute typically available in worker extensions and holds the device information.
| device = self.device | |
| device = self.model_runner.device |
No description provided.