DeviceSyncer 作为一个朴素的工具类, 其实现真的很简短, 就是如下 15 行代码:
MSCCLPP_DEVICE_INLINE void sync(int blockNum, int64_t maxSpinCount = 100000000) {
unsigned int maxOldCnt = blockNum - 1;
__syncthreads();
if (blockNum == 1) return;
if (threadIdx.x == 0) {
__threadfence();
unsigned int tmp = preFlag_ ^ 1;
if (atomicInc(&count_, maxOldCnt) == maxOldCnt) {
atomicStore(&flag_, tmp, memoryOrderRelaxed);
} else {
POLL_MAYBE_JAILBREAK((atomicLoad(&flag_, memoryOrderRelaxed) != tmp), maxSpinCount);
}
preFlag_ = tmp;
}
__syncthreads();
}
我一开始没想着把注意力放在这个工具类上, 本来只是准备简单地过一遍, 但是! 我被 atomicInc(&count_, maxOldCnt) == maxOldCnt
坑了! 我潜意识把这种形式的 atomicInc 理解为 atomicFetchAdd
, 即原子地执行 count_ += maxOldCnt
并返回 count_
之前的值, 抱着这个理解导致我对 sync 的实现苦思不得其解… 总之在我最后得知了 atomicInc
的语义是:
reads the 32-bit word old located at the address address in global or shared memory, computes ((old >= val) ? 0 : (old+1)), and stores the result back to memory at the same address. These three operations are performed in one atomic transaction. The function returns old.
我对 DeviceSyncer.sync 的好奇心也勾起来了. 出于与 C++ memory model 的斗争史, 在我接触 GPU 之后, 第一感兴趣的也是其 memory model, 所以当时也是第一时间精读了 Parallel Thread Execution ISA Version 8.5, CUDA Documents 与 memory model 相关的部分. 抱着这点知识, 我开始尝试解构下 DeviceSyncer.sync 实现.
P.S. 关于我与 C++ memory model 斗争史可以参考 从 fetch_add(0) 说起 系列文章, 从 2016 年到 2022 年…
语义
首先看下 DeviceSyncer.sync 的语义, 注释中说:
Synchronize all threads inside a kernel. Guarantee that all previous work of all threads in cooperating blocks is finished.
其使用姿势大概是:
__device__ mscclpp::DeviceSyncer deviceSyncer;
__global__ void kernel() {
do_something();
deviceSyncer.sync(gridDim.x);
// #1
}
kernel<<<4, 1024>>>();
此时当执行流执行到 #1
处时, 此时可以确保 4 个 thread block 共 4 * 1024 个线程都已经完成了 do_something()
, 所以看上去应该是等价于 __syncthreads()
的放大版.
Question 1
在摆脱了 atomicInc
带来的阴影之后, 紧接着我的一个问题是 preFlag_
为啥没有被 atomic 包起来, 这不明显会导致并发读写导致 data-race 么?!
block_0 block_1
devsyncer.sync(2) devsyncer.sync(2)
__threadfence() __threadfence()
read preFlag_ get 0 read preFlag_ get 0
tmp = 1 tmp = 1
atomicInc(&count_, 1), count_ is 1, return 0 atomicInc(&count_, 1), count_ is 0, return 1
while(atomicLoad(&flag_) != tmp);. flag_ = tmp = 1
preFlag_ = 1 #3 preFlag_ = 1
devsyncer.sync(2) devsyncer.sync(2)
__threadfence() __threadfence()
read preFlag_ get 1 read preFlag_ get 1 #1
tmp = 0 tmp = 0
atomicInc(&count_, 1), count_ is 0, return 1 #5 atomicInc(&count_, 1), count_ is 1, return 0 #4
flag_ = tmp = 0
preFlag_ = tmp = 0 #2
比如如上 #1
处能否看到 #2
处结果? 实际上是不可能的, 因为 #1
happen-before #4
happen-before #5
happen-before #2
. 但是我不喜欢这样, 这里 #1
还是能看到 #3
的, data race 确实是存在的! 如果是我的话, 我可能会用 atomic relaxed 把 preFlag_
包起来.
Question 2
第二个问题是 DeviceSyncer 真的能实现 Synchronization 么? 举例如下:
x_0 = x_1 = 0
block_0 block_1
x_0 = 33 x_1 = 77
device_syncer.sync(2) device_syncer.sync(2) #2
read x_1 #1
这里 #2
处的 device_syncer.sync
能确保 x_1 = 77
对 #1
可见么? 因为 DeviceSyncer.sync 中只是一些 fence 以及 relaxed memory order. 就像 cuda 文档中提到的:
Memory fence functions only affect the ordering of memory operations by a thread; they do not, by themselves, ensure that these memory operations are visible to other threads
为此编写了如下 demo, 核心点在与让两个 thread block 位于两个 sm, 之后确定 block_0, block_1 角色, 执行如上逻辑, 观察这里 block_0 读取到 x_1 = 0 的次数.
// demo
#include <stdlib.h>
#include <iostream>
#include <mscclpp/concurrency_device.hpp>
#define HIDVA_CUDATHROW(cmd) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
std::cerr << #cmd << ":" << cudaGetErrorString(err) << std::endl; \
abort(); \
} \
} while (false)
__device__ unsigned get_smid(void) {
unsigned ret = 0;
asm("mov.u32 %0, %smid;" : "=r"(ret));
return ret;
}
__managed__ unsigned int bad_apple_cnt = 0;
#define SM2_VAR_EXPECTED 77
#define SM1_VAR_EXPECTED 33
unsigned int sm1_var_expected = SM1_VAR_EXPECTED;
unsigned int sm2_var_expected = SM2_VAR_EXPECTED;
__device__ mscclpp::DeviceSyncer dev_syncer;
__device__ unsigned int sm1_var = SM1_VAR_EXPECTED;
__device__ void store_sm1_var(unsigned int val) {
asm("st.weak.global.wb.u32 [sm1_var], %0;" :: "r"(val));
}
__device__ unsigned int sm2_var = SM2_VAR_EXPECTED;
__device__ void store_sm2_var(unsigned int val) {
asm("st.weak.global.wb.u32 [sm2_var], %0;" :: "r"(val));
}
__device__ unsigned load_sm1_var() {
unsigned ret = 0;
asm("ld.weak.global.ca.u32 %0, [sm1_var];" : "=r"(ret));
return ret;
}
__device__ unsigned load_sm2_var() {
unsigned ret = 0;
asm("ld.weak.global.ca.u32 %0, [sm2_var];" : "=r"(ret));
return ret;
}
__managed__ unsigned int smids[2] = {0, 0};
__global__ void zy_sync_test()
{
if (load_sm1_var() != SM1_VAR_EXPECTED) {
__brkpt();
}
if (load_sm2_var() != SM2_VAR_EXPECTED) {
__brkpt();
}
if (threadIdx.x == 0) {
atomicExch(&smids[blockIdx.x], get_smid());
if (blockIdx.x == 0) {
// sm1_var = SM2_VAR_EXPECTED;
store_sm1_var(SM2_VAR_EXPECTED);
} else {
// sm2_var = SM1_VAR_EXPECTED;
store_sm2_var(SM1_VAR_EXPECTED);
}
}
dev_syncer.sync(gridDim.x);
if (threadIdx.x == 0) {
if (blockIdx.x == 0) {
if (load_sm2_var() == SM2_VAR_EXPECTED) {
atomicAdd(&bad_apple_cnt, 1);
// __brkpt();
}
} else {
if (load_sm1_var() == SM1_VAR_EXPECTED) {
atomicAdd(&bad_apple_cnt, 1);
// __brkpt();
}
}
}
__syncthreads();
}
int main(int argc, char** argv)
{
int device;
cudaDeviceProp prop;
cudaSetDevice(3);
cudaGetDevice(&device);
cudaGetDeviceProperties(&prop, device);
int max_sm = 0;
int num_blks = 0;
cudaOccupancyMaxActiveBlocksPerMultiprocessor(
&num_blks,
zy_sync_test,
1024,
max_sm);
std::cout << "device=" << device << ",maxThreadsPerMultiProcessor=" << prop.maxThreadsPerMultiProcessor
<< ",sharedMemPerMultiprocessor=" << prop.sharedMemPerMultiprocessor
<< ",max_sm=" << max_sm
<< ",num_blks=" << num_blks << std::endl;
mscclpp::DeviceSyncer syncer = {};
HIDVA_CUDATHROW(cudaMemcpyToSymbol(dev_syncer, &syncer, sizeof(mscclpp::DeviceSyncer)));
unsigned long i = 0;
unsigned long same_sm = 0;
while (true) {
HIDVA_CUDATHROW(cudaMemcpyToSymbol(sm1_var, &sm1_var_expected, sizeof(sm1_var_expected)));
HIDVA_CUDATHROW(cudaMemcpyToSymbol(sm2_var, &sm2_var_expected, sizeof(sm2_var_expected)));
zy_sync_test<<<2, 1024, max_sm>>>();
HIDVA_CUDATHROW(cudaDeviceSynchronize());
HIDVA_CUDATHROW(cudaGetLastError());
if (smids[0] == smids[1]) {
same_sm += 1;
}
++i;
if (i % 10000 == 0) {
std::cout << "Do it again!" << i << ",bad_apple=" << bad_apple_cnt << ",same_sm=" << same_sm << std::endl;
}
}
return 0;
}
然后惊讶地发现一直是 bad_apple=0
! 吃一堑长一智, 这次首先看下每个函数具体的语义, 就发现了:
void __threadfence(); is equivalent to cuda::atomic_thread_fence(cuda::memory_order_seq_cst, cuda::thread_scope_device).
memory_order_seq_cst!!! 手动将 DeviceSyncer.sync thread fence 改为 relaxed 的:
diff --git a/include/mscclpp/concurrency_device.hpp b/include/mscclpp/concurrency_device.hpp
index 6614b91..dc8d636 100644
--- a/include/mscclpp/concurrency_device.hpp
+++ b/include/mscclpp/concurrency_device.hpp
@@ -29,7 +29,8 @@ struct DeviceSyncer {
if (blockNum == 1) return;
if (threadIdx.x == 0) {
// Need a `__threadfence()` before to flip `flag`.
- __threadfence();
+ // __threadfence();
+ cuda::atomic_thread_fence(cuda::memory_order_relaxed, cuda::thread_scope_device);
unsigned int tmp = preFlag_ ^ 1;
if (atomicInc(&count_, maxOldCnt) == maxOldCnt) {
atomicStore(&flag_, tmp, memoryOrderRelaxed);
很清晰地观察到了 bad apple:
Do it again!10000, bad_apple=20000
Do it again!20000, bad_apple=40000
Do it again!30000, bad_apple=60000
Do it again!40000, bad_apple=80000
Do it again!50000, bad_apple=100000
Do it again!60000, bad_apple=120000
Do it again!70000, bad_apple=140000
Do it again!80000, bad_apple=160000