[fix] Support IPv6 address#42
Conversation
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
There was a problem hiding this comment.
Pull request overview
This PR aims to add IPv6 support for TransferQueue’s ZMQ communication by adjusting endpoint formatting and enabling the ZMQ IPv6 socket option where appropriate.
Changes:
- Added
is_ipv6_address()/format_zmq_address()utilities and updatedZMQServerInfo.to_addr()to use them. - Extended
create_zmq_socket()with an optionalipparameter to enablezmq.IPV6when needed. - Updated several bind/connect endpoints to use bracketed IP formatting.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
transfer_queue/utils/zmq_utils.py |
Adds IPv6 detection/address formatting helpers and optional IPv6 socket option enabling. |
transfer_queue/storage/simple_backend.py |
Passes node IP into socket creation and updates bind endpoint formatting. |
transfer_queue/storage/managers/simple_backend_manager.py |
Updates storage-unit connect endpoint formatting and passes IP for IPv6 option. |
transfer_queue/controller.py |
Passes node IP into socket creation and updates controller bind endpoints. |
transfer_queue/client.py |
Updates controller connect endpoint formatting and passes IP for IPv6 option. |
tests/test_async_simple_storage_manager.py |
Adds mypy suppression comments on request type usage in tests. |
Comments suppressed due to low confidence (2)
transfer_queue/utils/zmq_utils.py:265
create_zmq_socket()only enableszmq.IPV6when callers pass theipparameter. There are existing call sites that create sockets withoutipand then connect viaZMQServerInfo.to_addr(...)(e.g., storage manager handshake), so IPv6 endpoints would still fail. Consider enablingzmq.IPV6by default (or adding anaddress/endpoint_ipparameter and using that) so IPv6 support doesn’t depend on every caller being updated.
if ip is not None and is_ipv6_address(ip):
socket.setsockopt(zmq.IPV6, 1)
# Calculate buffer size based on system memory
total_mem = mem.total / 1024**3
available_mem = mem.available / 1024**3
# For systems with substantial memory (>32GB total, >16GB available):
# - Set a large 0.5GB buffer to improve throughput
# For systems with less memory:
# - Use system default (-1) to avoid excessive memory consumption
if total_mem > 32 and available_mem > 16:
buf_size = int(0.5 * 1024**3) # 0.5GB in bytes
else:
buf_size = -1 # Use system default buffer size
if socket_type in (zmq.PULL, zmq.DEALER, zmq.ROUTER):
socket.setsockopt(zmq.RCVHWM, 0)
socket.setsockopt(zmq.RCVBUF, buf_size)
if socket_type in (zmq.PUSH, zmq.DEALER, zmq.ROUTER):
transfer_queue/utils/zmq_utils.py:236
- New IPv6 formatting logic in
is_ipv6_address()/format_zmq_address()isn’t covered by tests. Adding unit tests for both IPv4 (e.g.,127.0.0.1) and IPv6 (e.g.,::1) would prevent regressions and catch invalid endpoint formats early.
def is_ipv6_address(ip: str) -> bool:
"""Check if the given IP address is an IPv6 address."""
try:
socket.inet_pton(socket.AF_INET6, ip)
return True
except OSError:
return False
def get_free_port() -> str:
"""Get free port of the host."""
with socket.socket() as sock:
sock.bind(("", 0))
return sock.getsockname()[1]
def create_zmq_socket(
ctx: zmq.Context,
socket_type: Any,
identity: Optional[bytestr] = None,
ip: Optional[str] = None,
) -> zmq.Socket:
"""Create ZMQ socket.
Args:
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self.handshake_socket = create_zmq_socket( | ||
| ctx=self.zmq_context, | ||
| socket_type=zmq.ROUTER, | ||
| ip=self._node_ip, | ||
| ) | ||
| self.handshake_socket.bind(f"tcp://{self._node_ip}:{self._handshake_socket_port}") | ||
| self.handshake_socket.bind(f"tcp://[{self._node_ip}]:{self._handshake_socket_port}") | ||
|
|
||
| self.request_handle_socket = create_zmq_socket( | ||
| ctx=self.zmq_context, | ||
| socket_type=zmq.ROUTER, | ||
| ip=self._node_ip, | ||
| ) | ||
| self.request_handle_socket.bind(f"tcp://{self._node_ip}:{self._request_handle_socket_port}") | ||
| self.request_handle_socket.bind(f"tcp://[{self._node_ip}]:{self._request_handle_socket_port}") |
There was a problem hiding this comment.
The bind endpoint uses tcp://[{self._node_ip}]:..., which will also bracket IPv4 literals. This risks breaking controller startup on typical IPv4-only setups. Use the common address formatter (with IPv6-only bracketing) for these bind endpoints so IPv4 remains tcp://ip:port while IPv6 becomes tcp://[ip]:port.
| address = f"tcp://[{server_info.ip}]:{server_info.ports.get(socket_name)}" | ||
| identity = f"{self.client_id}_to_{server_info.id}_{uuid4().hex[:8]}".encode() | ||
| sock = create_zmq_socket(context, zmq.DEALER, identity=identity) | ||
| sock = create_zmq_socket(context, zmq.DEALER, identity=identity, ip=server_info.ip) | ||
|
|
There was a problem hiding this comment.
The connect address is built as tcp://[{server_info.ip}]:... unconditionally. If server_info.ip is an IPv4 literal, the brackets can make the endpoint invalid. Use a shared formatter that only adds brackets for IPv6 (or call format_zmq_address(server_info.ip, port)) so IPv4 connections keep working.
| address = f"tcp://[{server_info.ip}]:{server_info.ports.get(socket_name)}" | ||
| identity = f"{self.storage_manager_id}_to_{server_info.id}_{uuid4().hex[:8]}".encode() | ||
| sock = create_zmq_socket(context, zmq.DEALER, identity=identity) | ||
| sock = create_zmq_socket(context, zmq.DEALER, identity=identity, ip=server_info.ip) | ||
|
|
There was a problem hiding this comment.
The storage unit connect address is built as tcp://[{server_info.ip}]:... for all IPs. This will likely break IPv4 endpoints (e.g., 127.0.0.1). Prefer using the shared address formatting helper that brackets only IPv6 literals.
| @@ -220,11 +229,23 @@ def create_zmq_socket( | |||
| ctx: zmq.Context, | |||
| socket_type: Any, | |||
| identity: Optional[bytestr] = None, | |||
| ip: Optional[str] = None, | |||
| ) -> zmq.Socket: | |||
| """Create ZMQ socket.""" | |||
| """Create ZMQ socket. | |||
|
|
|||
| Args: | |||
There was a problem hiding this comment.
format_zmq_address() currently wraps all IPs in brackets (tcp://[{ip}]:{port}). Brackets are required for IPv6 literals but can break IPv4 endpoints (e.g., tcp://[127.0.0.1]:5555). Use is_ipv6_address(ip) (or ipaddress.ip_address) to conditionally format as tcp://{ip}:{port} for IPv4 and tcp://[{ip}]:{port} for IPv6, and update the docstring accordingly.
| self.put_get_socket = create_zmq_socket(self.zmq_context, zmq.ROUTER, ip=self._node_ip) | ||
|
|
||
| while True: | ||
| try: | ||
| self._put_get_socket_port = get_free_port() | ||
| self.put_get_socket.bind(f"tcp://{self._node_ip}:{self._put_get_socket_port}") | ||
| self.put_get_socket.bind(f"tcp://[{self._node_ip}]:{self._put_get_socket_port}") | ||
| break |
There was a problem hiding this comment.
Binding with tcp://[{self._node_ip}]:... wraps IPv4 addresses in brackets as well. If _node_ip is an IPv4 literal (common for ray.util.get_node_ip_address()), this can produce an invalid ZMQ endpoint. Prefer using the shared format_zmq_address(self._node_ip, self._put_get_socket_port) (with conditional IPv6 bracketing) for the bind address.
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
1 similar comment
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Support IPv6 environment.