Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4d04536
support symm memory on XPU devices
zhangxiaoli73 Sep 15, 2025
9563c00
remove atomic_ref
zhangxiaoli73 Feb 27, 2026
3f68da3
symm both xpu and ishmem
Chao1Han Dec 8, 2025
ac8f99b
unify IpcChannel and rm dynamic ze
Chao1Han Dec 10, 2025
a18c6d5
add miss file
Chao1Han Dec 11, 2025
c86f32f
revert barrier implementations
zhangxiaoli73 Jan 20, 2026
6bf4775
rm ishmem related file
Chao1Han May 15, 2026
b97ade9
clean up code and use sycl ipc
Chao1Han May 15, 2026
381b96b
Add basic test case
Chao1Han May 15, 2026
03cedc1
lint
Chao1Han May 15, 2026
48766d6
remove ishmem related
Chao1Han May 15, 2026
9cbdf51
Merge branch 'main' into cherry/add-symm-xpu
Chao1Han May 18, 2026
8805b91
address some comments
Chao1Han May 18, 2026
cff6db1
Address PR review feedback on XPUSymmetricMemory And remove XPUSymmet…
Chao1Han May 20, 2026
c8fdaa3
Merge branch 'main' into cherry/add-symm-xpu
Chao1Han May 21, 2026
60b5d07
Merge branch 'main' into cherry/add-symm-xpu
Chao1Han May 21, 2026
307db6d
Apply suggestions from code review
zhangxiaoli73 May 22, 2026
4d428fb
default use signal barrier
Chao1Han May 22, 2026
40dfa7d
Merge branch 'main' into cherry/add-symm-xpu
Chao1Han May 25, 2026
9a488d9
align timeout check in api definition
zhangxiaoli73 May 25, 2026
ec2bfba
Apply suggestions from code review
zhangxiaoli73 May 25, 2026
c1b66cc
remove unused IpcChannel
Chao1Han May 25, 2026
af0dc74
lint
Chao1Han May 25, 2026
1133f73
Merge branch 'main' into cherry/add-symm-xpu
Chao1Han May 26, 2026
9fb690f
Implemented prctl(PR_SET_PTRACER, ppid, 0, 0, 0) during the initializ…
Chao1Han May 26, 2026
fd39f3d
lint
Chao1Han May 26, 2026
696f527
Merge branch 'main' into cherry/add-symm-xpu
Chao1Han May 29, 2026
947bd9b
ppid to any
Chao1Han May 29, 2026
076bc57
Merge branch 'main' into cherry/add-symm-xpu
Chao1Han Jun 1, 2026
84b0012
Merge branch 'main' into cherry/add-symm-xpu
Chao1Han Jun 1, 2026
97ae6ca
rm fallback barrier and first test symm
Chao1Han Jun 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/xccl/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
file(GLOB xccl_h "*.hpp")
file(GLOB xccl_cpp "*.cpp")
list(REMOVE_ITEM xccl_cpp "${CMAKE_CURRENT_SOURCE_DIR}/NanCheck_XPU.cpp")
list(REMOVE_ITEM xccl_cpp "${CMAKE_CURRENT_SOURCE_DIR}/Signal.cpp")

list(APPEND ATen_XPU_XCCL_SRCS ${xccl_cpp})
list(APPEND ATen_XPU_SYCL_SRCS "${CMAKE_CURRENT_SOURCE_DIR}/NanCheck_XPU.cpp")
list(APPEND ATen_XPU_SYCL_SRCS "${CMAKE_CURRENT_SOURCE_DIR}/Signal.cpp")

set(ATen_XPU_XCCL_SRCS ${ATen_XPU_XCCL_SRCS} PARENT_SCOPE)
set(ATen_XPU_SYCL_SRCS ${ATen_XPU_SYCL_SRCS} PARENT_SCOPE)
Expand Down
201 changes: 201 additions & 0 deletions src/xccl/Signal.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
#include <ATen/xpu/XPUContext.h>
#include <comm/SYCLContext.h>
#include <xccl/Signal.hpp>
#include <chrono>

namespace c10d::symmetric_memory {

struct barrierKernel {
void operator()(sycl::nd_item<1> item) const {
auto thread_id = item.get_local_id(0);

if (thread_id < world_size) {
auto target_rank = thread_id;
if (target_rank == rank) {
return;
}
auto put_success = try_put_signal_device<std::memory_order_release>(
signal_pads[target_rank] + world_size * channel + rank, timeout_ms);
if (!put_success) {
SYCL_KERNEL_ASSERT(false);
}
Comment thread
Chao1Han marked this conversation as resolved.

auto wait_success = try_wait_signal_device<std::memory_order_acquire>(
signal_pads[rank] + world_size * channel + target_rank, timeout_ms);
if (!wait_success) {
SYCL_KERNEL_ASSERT(false);
}
}
}

barrierKernel(
uint32_t** signal_pads,
int channel,
int rank,
int world_size,
size_t timeout_ms)
: signal_pads(signal_pads),
channel(channel),
rank(rank),
world_size(world_size),
timeout_ms(timeout_ms) {}

private:
uint32_t** signal_pads;
int channel;
int rank;
int world_size;
size_t timeout_ms;
};

void barrier_impl_xpu(
uint32_t** signal_pads,
int channel,
int rank,
int world_size,
size_t timeout_ms,
at::xpu::XPUStream& stream) {
int64_t maxNumThreadsPerBlock = syclMaxWorkGroupSize<barrierKernel>();
const size_t numThreadsPerBlock =
std::min<size_t>(maxNumThreadsPerBlock, std::max(32, world_size));

if (!(numThreadsPerBlock > 0)) {
return;
}
int64_t numBlocks = 1;
auto global_range = numBlocks * numThreadsPerBlock;
auto local_range = numThreadsPerBlock;

using Kernel = barrierKernel;
auto kfn = Kernel(signal_pads, channel, rank, world_size, timeout_ms);

sycl_kernel_submit(global_range, local_range, stream.queue(), kfn);
}
Comment thread
Chao1Han marked this conversation as resolved.

struct putSignalKernel {
void operator()(sycl::nd_item<1> item) const {
auto thread_id = item.get_local_id(0);

if (thread_id == 0) {
auto put_success = try_put_signal_device<std::memory_order_release>(
signal_pads[dst_rank] + world_size * channel + rank, 10000000);
if (!put_success) {
SYCL_KERNEL_ASSERT(false);
}
}
}

putSignalKernel(
uint32_t** signal_pads,
int dst_rank,
int channel,
int rank,
int world_size,
size_t timeout_ms)
: signal_pads(signal_pads),
dst_rank(dst_rank),
channel(channel),
rank(rank),
world_size(world_size),
timeout_ms(timeout_ms) {}

private:
uint32_t** signal_pads;
int dst_rank;
int channel;
int rank;
int world_size;
size_t timeout_ms;
};

void put_signal_impl_xpu(
uint32_t** signal_pads,
int dst_rank,
int channel,
int rank,
int world_size,
size_t timeout_ms,
at::xpu::XPUStream& stream) {
int64_t maxNumThreadsPerBlock = syclMaxWorkGroupSize<putSignalKernel>();
const size_t numThreadsPerBlock = std::min<size_t>(maxNumThreadsPerBlock, 32);

if (!(numThreadsPerBlock > 0)) {
return;
}

int64_t numBlocks = 1;
auto global_range = numBlocks * numThreadsPerBlock;
auto local_range = numThreadsPerBlock;

using Kernel = putSignalKernel;
auto kfn =
Kernel(signal_pads, dst_rank, channel, rank, world_size, timeout_ms);

sycl_kernel_submit(global_range, local_range, stream.queue(), kfn);
}

struct waitSignalKernel {
void operator()(sycl::nd_item<1> item) const {
auto thread_id = item.get_local_id(0);

if (thread_id == 0) {
auto wait_success = try_wait_signal_device<std::memory_order_acquire>(
signal_pads[rank] + world_size * channel + src_rank, 10000000);
if (!wait_success) {
SYCL_KERNEL_ASSERT(false);
}

sycl::atomic_fence(sycl::memory_order_seq_cst, sycl::memory_scope_system);
}
}

waitSignalKernel(
uint32_t** signal_pads,
int src_rank,
int channel,
int rank,
int world_size,
size_t timeout_ms)
: signal_pads(signal_pads),
src_rank(src_rank),
channel(channel),
rank(rank),
world_size(world_size),
timeout_ms(timeout_ms) {}

private:
uint32_t** signal_pads;
int src_rank;
int channel;
int rank;
int world_size;
size_t timeout_ms;
};

void wait_signal_impl_xpu(
uint32_t** signal_pads,
int src_rank,
int channel,
int rank,
int world_size,
size_t timeout_ms,
at::xpu::XPUStream& stream) {
int64_t maxNumThreadsPerBlock = syclMaxWorkGroupSize<waitSignalKernel>();
const size_t numThreadsPerBlock = std::min<size_t>(maxNumThreadsPerBlock, 32);

if (!(numThreadsPerBlock > 0)) {
return;
}

int64_t numBlocks = 1;
auto global_range = numBlocks * numThreadsPerBlock;
auto local_range = numThreadsPerBlock;

using Kernel = waitSignalKernel;
auto kfn =
Kernel(signal_pads, src_rank, channel, rank, world_size, timeout_ms);

sycl_kernel_submit(global_range, local_range, stream.queue(), kfn);
}

} // namespace c10d::symmetric_memory
88 changes: 88 additions & 0 deletions src/xccl/Signal.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#pragma once

#include <atomic>

#include <ATen/native/xpu/sycl/MemoryAccess.h>
#include <comm/SYCLContext.h>

namespace c10d::symmetric_memory {

using at::native::memory::get_alignment;

// =============================================================================
// Signal primitives using store/load + atomic_fence
// (sycl::atomic_ref is not supported, use explicit fence instead)
// =============================================================================

// Store value with release fence (for put_signal)
// Order: store first, then release fence to flush the store
inline void store_release(uint32_t* addr, uint32_t val) {
*addr = val;
sycl::atomic_fence(sycl::memory_order::release, sycl::memory_scope::system);
}

// Load value with acquire fence (for get_signal/wait_signal)
// Order: acquire fence first, then load to see the latest value
inline uint32_t load_acquire(uint32_t* addr) {
sycl::atomic_fence(sycl::memory_order::acquire, sycl::memory_scope::system);
uint32_t val = *addr;
return val;
}

// =============================================================================
// Put signal: wait until addr == 0, then set to 1 (release semantics)
// =============================================================================

template <std::memory_order Sem>
bool try_put_signal_device(uint32_t* addr, size_t timeout_ms) {
// Wait until the slot is free (value == 0)
while (load_acquire(addr) != 0) {
// Spin wait (no timeout check as IGC issue)
continue;
}
// Set signal to 1 with release semantics
store_release(addr, 1);
return true;
}

// =============================================================================
// Wait signal: wait until addr == 1, then set to 0 (acquire semantics)
// =============================================================================
template <std::memory_order Sem>
bool try_wait_signal_device(uint32_t* addr, size_t timeout_ms) {
// Wait until signal is set (value == 1)
while (load_acquire(addr) != 1) {
// Spin wait (no timeout check as IGC issue)
continue;
}
// Clear signal to 0 with release semantics
store_release(addr, 0);
return true;
}
Comment thread
Chao1Han marked this conversation as resolved.
Comment thread
Chao1Han marked this conversation as resolved.

void barrier_impl_xpu(
uint32_t** signal_pads,
int channel,
int rank,
int world_size,
size_t timeout_ms,
at::xpu::XPUStream& stream);

void put_signal_impl_xpu(
uint32_t** signal_pads,
int dst_rank,
int channel,
int rank,
int world_size,
size_t timeout_ms,
at::xpu::XPUStream& stream);

void wait_signal_impl_xpu(
uint32_t** signal_pads,
int src_rank,
int channel,
int rank,
int world_size,
size_t timeout_ms,
at::xpu::XPUStream& stream);
} // namespace c10d::symmetric_memory
Loading
Loading