-
Notifications
You must be signed in to change notification settings - Fork 108
Add symmetric memory support on XPU device #2041
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
zhangxiaoli73
wants to merge
31
commits into
main
Choose a base branch
from
cherry/add-symm-xpu
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
31 commits
Select commit
Hold shift + click to select a range
4d04536
support symm memory on XPU devices
zhangxiaoli73 9563c00
remove atomic_ref
zhangxiaoli73 3f68da3
symm both xpu and ishmem
Chao1Han ac8f99b
unify IpcChannel and rm dynamic ze
Chao1Han a18c6d5
add miss file
Chao1Han c86f32f
revert barrier implementations
zhangxiaoli73 6bf4775
rm ishmem related file
Chao1Han b97ade9
clean up code and use sycl ipc
Chao1Han 381b96b
Add basic test case
Chao1Han 03cedc1
lint
Chao1Han 48766d6
remove ishmem related
Chao1Han 9cbdf51
Merge branch 'main' into cherry/add-symm-xpu
Chao1Han 8805b91
address some comments
Chao1Han cff6db1
Address PR review feedback on XPUSymmetricMemory And remove XPUSymmet…
Chao1Han c8fdaa3
Merge branch 'main' into cherry/add-symm-xpu
Chao1Han 60b5d07
Merge branch 'main' into cherry/add-symm-xpu
Chao1Han 307db6d
Apply suggestions from code review
zhangxiaoli73 4d428fb
default use signal barrier
Chao1Han 40dfa7d
Merge branch 'main' into cherry/add-symm-xpu
Chao1Han 9a488d9
align timeout check in api definition
zhangxiaoli73 ec2bfba
Apply suggestions from code review
zhangxiaoli73 c1b66cc
remove unused IpcChannel
Chao1Han af0dc74
lint
Chao1Han 1133f73
Merge branch 'main' into cherry/add-symm-xpu
Chao1Han 9fb690f
Implemented prctl(PR_SET_PTRACER, ppid, 0, 0, 0) during the initializ…
Chao1Han fd39f3d
lint
Chao1Han 696f527
Merge branch 'main' into cherry/add-symm-xpu
Chao1Han 947bd9b
ppid to any
Chao1Han 076bc57
Merge branch 'main' into cherry/add-symm-xpu
Chao1Han 84b0012
Merge branch 'main' into cherry/add-symm-xpu
Chao1Han 97ae6ca
rm fallback barrier and first test symm
Chao1Han File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,201 @@ | ||
| #include <ATen/xpu/XPUContext.h> | ||
| #include <comm/SYCLContext.h> | ||
| #include <xccl/Signal.hpp> | ||
| #include <chrono> | ||
|
|
||
| namespace c10d::symmetric_memory { | ||
|
|
||
| struct barrierKernel { | ||
| void operator()(sycl::nd_item<1> item) const { | ||
| auto thread_id = item.get_local_id(0); | ||
|
|
||
| if (thread_id < world_size) { | ||
| auto target_rank = thread_id; | ||
| if (target_rank == rank) { | ||
| return; | ||
| } | ||
| auto put_success = try_put_signal_device<std::memory_order_release>( | ||
| signal_pads[target_rank] + world_size * channel + rank, timeout_ms); | ||
| if (!put_success) { | ||
| SYCL_KERNEL_ASSERT(false); | ||
| } | ||
|
|
||
| auto wait_success = try_wait_signal_device<std::memory_order_acquire>( | ||
| signal_pads[rank] + world_size * channel + target_rank, timeout_ms); | ||
| if (!wait_success) { | ||
| SYCL_KERNEL_ASSERT(false); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| barrierKernel( | ||
| uint32_t** signal_pads, | ||
| int channel, | ||
| int rank, | ||
| int world_size, | ||
| size_t timeout_ms) | ||
| : signal_pads(signal_pads), | ||
| channel(channel), | ||
| rank(rank), | ||
| world_size(world_size), | ||
| timeout_ms(timeout_ms) {} | ||
|
|
||
| private: | ||
| uint32_t** signal_pads; | ||
| int channel; | ||
| int rank; | ||
| int world_size; | ||
| size_t timeout_ms; | ||
| }; | ||
|
|
||
| void barrier_impl_xpu( | ||
| uint32_t** signal_pads, | ||
| int channel, | ||
| int rank, | ||
| int world_size, | ||
| size_t timeout_ms, | ||
| at::xpu::XPUStream& stream) { | ||
| int64_t maxNumThreadsPerBlock = syclMaxWorkGroupSize<barrierKernel>(); | ||
| const size_t numThreadsPerBlock = | ||
| std::min<size_t>(maxNumThreadsPerBlock, std::max(32, world_size)); | ||
|
|
||
| if (!(numThreadsPerBlock > 0)) { | ||
| return; | ||
| } | ||
| int64_t numBlocks = 1; | ||
| auto global_range = numBlocks * numThreadsPerBlock; | ||
| auto local_range = numThreadsPerBlock; | ||
|
|
||
| using Kernel = barrierKernel; | ||
| auto kfn = Kernel(signal_pads, channel, rank, world_size, timeout_ms); | ||
|
|
||
| sycl_kernel_submit(global_range, local_range, stream.queue(), kfn); | ||
| } | ||
|
Chao1Han marked this conversation as resolved.
|
||
|
|
||
| struct putSignalKernel { | ||
| void operator()(sycl::nd_item<1> item) const { | ||
| auto thread_id = item.get_local_id(0); | ||
|
|
||
| if (thread_id == 0) { | ||
| auto put_success = try_put_signal_device<std::memory_order_release>( | ||
| signal_pads[dst_rank] + world_size * channel + rank, 10000000); | ||
| if (!put_success) { | ||
| SYCL_KERNEL_ASSERT(false); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| putSignalKernel( | ||
| uint32_t** signal_pads, | ||
| int dst_rank, | ||
| int channel, | ||
| int rank, | ||
| int world_size, | ||
| size_t timeout_ms) | ||
| : signal_pads(signal_pads), | ||
| dst_rank(dst_rank), | ||
| channel(channel), | ||
| rank(rank), | ||
| world_size(world_size), | ||
| timeout_ms(timeout_ms) {} | ||
|
|
||
| private: | ||
| uint32_t** signal_pads; | ||
| int dst_rank; | ||
| int channel; | ||
| int rank; | ||
| int world_size; | ||
| size_t timeout_ms; | ||
| }; | ||
|
|
||
| void put_signal_impl_xpu( | ||
| uint32_t** signal_pads, | ||
| int dst_rank, | ||
| int channel, | ||
| int rank, | ||
| int world_size, | ||
| size_t timeout_ms, | ||
| at::xpu::XPUStream& stream) { | ||
| int64_t maxNumThreadsPerBlock = syclMaxWorkGroupSize<putSignalKernel>(); | ||
| const size_t numThreadsPerBlock = std::min<size_t>(maxNumThreadsPerBlock, 32); | ||
|
|
||
| if (!(numThreadsPerBlock > 0)) { | ||
| return; | ||
| } | ||
|
|
||
| int64_t numBlocks = 1; | ||
| auto global_range = numBlocks * numThreadsPerBlock; | ||
| auto local_range = numThreadsPerBlock; | ||
|
|
||
| using Kernel = putSignalKernel; | ||
| auto kfn = | ||
| Kernel(signal_pads, dst_rank, channel, rank, world_size, timeout_ms); | ||
|
|
||
| sycl_kernel_submit(global_range, local_range, stream.queue(), kfn); | ||
| } | ||
|
|
||
| struct waitSignalKernel { | ||
| void operator()(sycl::nd_item<1> item) const { | ||
| auto thread_id = item.get_local_id(0); | ||
|
|
||
| if (thread_id == 0) { | ||
| auto wait_success = try_wait_signal_device<std::memory_order_acquire>( | ||
| signal_pads[rank] + world_size * channel + src_rank, 10000000); | ||
| if (!wait_success) { | ||
| SYCL_KERNEL_ASSERT(false); | ||
| } | ||
|
|
||
| sycl::atomic_fence(sycl::memory_order_seq_cst, sycl::memory_scope_system); | ||
| } | ||
| } | ||
|
|
||
| waitSignalKernel( | ||
| uint32_t** signal_pads, | ||
| int src_rank, | ||
| int channel, | ||
| int rank, | ||
| int world_size, | ||
| size_t timeout_ms) | ||
| : signal_pads(signal_pads), | ||
| src_rank(src_rank), | ||
| channel(channel), | ||
| rank(rank), | ||
| world_size(world_size), | ||
| timeout_ms(timeout_ms) {} | ||
|
|
||
| private: | ||
| uint32_t** signal_pads; | ||
| int src_rank; | ||
| int channel; | ||
| int rank; | ||
| int world_size; | ||
| size_t timeout_ms; | ||
| }; | ||
|
|
||
| void wait_signal_impl_xpu( | ||
| uint32_t** signal_pads, | ||
| int src_rank, | ||
| int channel, | ||
| int rank, | ||
| int world_size, | ||
| size_t timeout_ms, | ||
| at::xpu::XPUStream& stream) { | ||
| int64_t maxNumThreadsPerBlock = syclMaxWorkGroupSize<waitSignalKernel>(); | ||
| const size_t numThreadsPerBlock = std::min<size_t>(maxNumThreadsPerBlock, 32); | ||
|
|
||
| if (!(numThreadsPerBlock > 0)) { | ||
| return; | ||
| } | ||
|
|
||
| int64_t numBlocks = 1; | ||
| auto global_range = numBlocks * numThreadsPerBlock; | ||
| auto local_range = numThreadsPerBlock; | ||
|
|
||
| using Kernel = waitSignalKernel; | ||
| auto kfn = | ||
| Kernel(signal_pads, src_rank, channel, rank, world_size, timeout_ms); | ||
|
|
||
| sycl_kernel_submit(global_range, local_range, stream.queue(), kfn); | ||
| } | ||
|
|
||
| } // namespace c10d::symmetric_memory | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| #pragma once | ||
|
|
||
| #include <atomic> | ||
|
|
||
| #include <ATen/native/xpu/sycl/MemoryAccess.h> | ||
| #include <comm/SYCLContext.h> | ||
|
|
||
| namespace c10d::symmetric_memory { | ||
|
|
||
| using at::native::memory::get_alignment; | ||
|
|
||
| // ============================================================================= | ||
| // Signal primitives using store/load + atomic_fence | ||
| // (sycl::atomic_ref is not supported, use explicit fence instead) | ||
| // ============================================================================= | ||
|
|
||
| // Store value with release fence (for put_signal) | ||
| // Order: store first, then release fence to flush the store | ||
| inline void store_release(uint32_t* addr, uint32_t val) { | ||
| *addr = val; | ||
| sycl::atomic_fence(sycl::memory_order::release, sycl::memory_scope::system); | ||
| } | ||
|
|
||
| // Load value with acquire fence (for get_signal/wait_signal) | ||
| // Order: acquire fence first, then load to see the latest value | ||
| inline uint32_t load_acquire(uint32_t* addr) { | ||
| sycl::atomic_fence(sycl::memory_order::acquire, sycl::memory_scope::system); | ||
| uint32_t val = *addr; | ||
| return val; | ||
| } | ||
|
|
||
| // ============================================================================= | ||
| // Put signal: wait until addr == 0, then set to 1 (release semantics) | ||
| // ============================================================================= | ||
|
|
||
| template <std::memory_order Sem> | ||
| bool try_put_signal_device(uint32_t* addr, size_t timeout_ms) { | ||
| // Wait until the slot is free (value == 0) | ||
| while (load_acquire(addr) != 0) { | ||
| // Spin wait (no timeout check as IGC issue) | ||
| continue; | ||
| } | ||
| // Set signal to 1 with release semantics | ||
| store_release(addr, 1); | ||
| return true; | ||
| } | ||
|
|
||
| // ============================================================================= | ||
| // Wait signal: wait until addr == 1, then set to 0 (acquire semantics) | ||
| // ============================================================================= | ||
| template <std::memory_order Sem> | ||
| bool try_wait_signal_device(uint32_t* addr, size_t timeout_ms) { | ||
| // Wait until signal is set (value == 1) | ||
| while (load_acquire(addr) != 1) { | ||
| // Spin wait (no timeout check as IGC issue) | ||
| continue; | ||
| } | ||
| // Clear signal to 0 with release semantics | ||
| store_release(addr, 0); | ||
| return true; | ||
| } | ||
|
Chao1Han marked this conversation as resolved.
Chao1Han marked this conversation as resolved.
|
||
|
|
||
| void barrier_impl_xpu( | ||
| uint32_t** signal_pads, | ||
| int channel, | ||
| int rank, | ||
| int world_size, | ||
| size_t timeout_ms, | ||
| at::xpu::XPUStream& stream); | ||
|
|
||
| void put_signal_impl_xpu( | ||
| uint32_t** signal_pads, | ||
| int dst_rank, | ||
| int channel, | ||
| int rank, | ||
| int world_size, | ||
| size_t timeout_ms, | ||
| at::xpu::XPUStream& stream); | ||
|
|
||
| void wait_signal_impl_xpu( | ||
| uint32_t** signal_pads, | ||
| int src_rank, | ||
| int channel, | ||
| int rank, | ||
| int world_size, | ||
| size_t timeout_ms, | ||
| at::xpu::XPUStream& stream); | ||
| } // namespace c10d::symmetric_memory | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.