AI

mscclpp DeviceSyncer 真的能 sync 么?

Posted by w@hidva.com on December 1, 2024

DeviceSyncer 作为一个朴素的工具类, 其实现真的很简短, 就是如下 15 行代码:

MSCCLPP_DEVICE_INLINE void sync(int blockNum, int64_t maxSpinCount = 100000000) {
  unsigned int maxOldCnt = blockNum - 1;
  __syncthreads();
  if (blockNum == 1) return;
  if (threadIdx.x == 0) {
    __threadfence();
    unsigned int tmp = preFlag_ ^ 1;
    if (atomicInc(&count_, maxOldCnt) == maxOldCnt) {
      atomicStore(&flag_, tmp, memoryOrderRelaxed);
    } else {
      POLL_MAYBE_JAILBREAK((atomicLoad(&flag_, memoryOrderRelaxed) != tmp), maxSpinCount);
    }
    preFlag_ = tmp;
  }
  __syncthreads();
}

我一开始没想着把注意力放在这个工具类上, 本来只是准备简单地过一遍, 但是! 我被 atomicInc(&count_, maxOldCnt) == maxOldCnt 坑了! 我潜意识把这种形式的 atomicInc 理解为 atomicFetchAdd, 即原子地执行 count_ += maxOldCnt 并返回 count_ 之前的值, 抱着这个理解导致我对 sync 的实现苦思不得其解… 总之在我最后得知了 atomicInc 的语义是:

reads the 32-bit word old located at the address address in global or shared memory, computes ((old >= val) ? 0 : (old+1)), and stores the result back to memory at the same address. These three operations are performed in one atomic transaction. The function returns old.

我对 DeviceSyncer.sync 的好奇心也勾起来了. 出于与 C++ memory model 的斗争史, 在我接触 GPU 之后, 第一感兴趣的也是其 memory model, 所以当时也是第一时间精读了 Parallel Thread Execution ISA Version 8.5, CUDA Documents 与 memory model 相关的部分. 抱着这点知识, 我开始尝试解构下 DeviceSyncer.sync 实现.

P.S. 关于我与 C++ memory model 斗争史可以参考 从 fetch_add(0) 说起 系列文章, 从 2016 年到 2022 年…

语义

首先看下 DeviceSyncer.sync 的语义, 注释中说:

Synchronize all threads inside a kernel. Guarantee that all previous work of all threads in cooperating blocks is finished.

其使用姿势大概是:

__device__ mscclpp::DeviceSyncer deviceSyncer;
__global__ void kernel() {
  do_something();
  deviceSyncer.sync(gridDim.x);
  // #1
}
kernel<<<4, 1024>>>();

此时当执行流执行到 #1 处时, 此时可以确保 4 个 thread block 共 4 * 1024 个线程都已经完成了 do_something(), 所以看上去应该是等价于 __syncthreads() 的放大版.

Question 1

在摆脱了 atomicInc 带来的阴影之后, 紧接着我的一个问题是 preFlag_ 为啥没有被 atomic 包起来, 这不明显会导致并发读写导致 data-race 么?!

     block_0                                                block_1
devsyncer.sync(2)                                       devsyncer.sync(2)
  __threadfence()                                         __threadfence()
  read preFlag_ get 0                                     read preFlag_ get 0
  tmp = 1                                                 tmp = 1
  atomicInc(&count_, 1), count_ is 1, return 0            atomicInc(&count_, 1), count_ is 0, return 1
  while(atomicLoad(&flag_) != tmp);.                      flag_ = tmp = 1
  preFlag_ = 1 #3                                         preFlag_ = 1

devsyncer.sync(2)                                      devsyncer.sync(2)
  __threadfence()                                         __threadfence()
  read preFlag_ get 1                                     read preFlag_ get 1 #1
  tmp = 0                                                 tmp = 0
  atomicInc(&count_, 1), count_ is 0, return 1 #5         atomicInc(&count_, 1), count_ is 1, return 0 #4
  flag_ = tmp = 0
  preFlag_ = tmp = 0 #2

比如如上 #1 处能否看到 #2 处结果? 实际上是不可能的, 因为 #1 happen-before #4 happen-before #5 happen-before #2. 但是我不喜欢这样, 这里 #1 还是能看到 #3 的, data race 确实是存在的! 如果是我的话, 我可能会用 atomic relaxed 把 preFlag_ 包起来.

Question 2

第二个问题是 DeviceSyncer 真的能实现 Synchronization 么? 举例如下:

                         x_0 = x_1 = 0
        block_0                          block_1
        x_0 = 33                         x_1 = 77
 device_syncer.sync(2)              device_syncer.sync(2) #2
        read x_1 #1

这里 #2 处的 device_syncer.sync 能确保 x_1 = 77#1 可见么? 因为 DeviceSyncer.sync 中只是一些 fence 以及 relaxed memory order. 就像 cuda 文档中提到的:

Memory fence functions only affect the ordering of memory operations by a thread; they do not, by themselves, ensure that these memory operations are visible to other threads

为此编写了如下 demo, 核心点在与让两个 thread block 位于两个 sm, 之后确定 block_0, block_1 角色, 执行如上逻辑, 观察这里 block_0 读取到 x_1 = 0 的次数.

// demo
#include <stdlib.h>
#include <iostream>
#include <mscclpp/concurrency_device.hpp>

#define HIDVA_CUDATHROW(cmd)                                                                                       \
  do {                                                                                                               \
    cudaError_t err = cmd;                                                                                           \
    if (err != cudaSuccess) {                                                                                        \
      std::cerr << #cmd << ":" << cudaGetErrorString(err) << std::endl;                                             \
      abort();                                                                                                       \
    }                                                                                                                \
  } while (false)

__device__ unsigned get_smid(void) {
  unsigned ret = 0;
  asm("mov.u32 %0, %smid;" : "=r"(ret));
  return ret;
}

__managed__ unsigned int bad_apple_cnt = 0;

#define SM2_VAR_EXPECTED 77
#define SM1_VAR_EXPECTED 33
unsigned int sm1_var_expected = SM1_VAR_EXPECTED;
unsigned int sm2_var_expected = SM2_VAR_EXPECTED;
__device__ mscclpp::DeviceSyncer dev_syncer;

__device__ unsigned int sm1_var = SM1_VAR_EXPECTED;
__device__ void store_sm1_var(unsigned int val) {
  asm("st.weak.global.wb.u32  [sm1_var], %0;" :: "r"(val));
}

__device__ unsigned int sm2_var = SM2_VAR_EXPECTED;
__device__ void store_sm2_var(unsigned int val) {
  asm("st.weak.global.wb.u32  [sm2_var], %0;" :: "r"(val));
}

__device__ unsigned load_sm1_var() {
  unsigned ret = 0;
  asm("ld.weak.global.ca.u32 %0, [sm1_var];" : "=r"(ret));
  return ret;
}

__device__ unsigned load_sm2_var() {
  unsigned ret = 0;
  asm("ld.weak.global.ca.u32 %0, [sm2_var];" : "=r"(ret));
  return ret;
}

__managed__ unsigned int smids[2] = {0, 0};

__global__ void zy_sync_test()
{
  if (load_sm1_var() != SM1_VAR_EXPECTED) {
    __brkpt();
  }
  if (load_sm2_var() != SM2_VAR_EXPECTED) {
    __brkpt();
  }

  if (threadIdx.x == 0) {
    atomicExch(&smids[blockIdx.x], get_smid());
    if (blockIdx.x == 0) {
      // sm1_var = SM2_VAR_EXPECTED;
      store_sm1_var(SM2_VAR_EXPECTED);
    } else {
      // sm2_var = SM1_VAR_EXPECTED;
      store_sm2_var(SM1_VAR_EXPECTED);
    }
  }

  dev_syncer.sync(gridDim.x);

  if (threadIdx.x == 0) {
    if (blockIdx.x == 0) {
      if (load_sm2_var() == SM2_VAR_EXPECTED) {
        atomicAdd(&bad_apple_cnt, 1);
        // __brkpt();
      }
    } else {
      if (load_sm1_var() == SM1_VAR_EXPECTED) {
        atomicAdd(&bad_apple_cnt, 1);
        // __brkpt();
      }
    }
  }
  __syncthreads();
}

int main(int argc, char** argv)
{
  int device;
  cudaDeviceProp prop;
  cudaSetDevice(3);
  cudaGetDevice(&device);
  cudaGetDeviceProperties(&prop, device);

  int max_sm = 0;
  int num_blks = 0;
  cudaOccupancyMaxActiveBlocksPerMultiprocessor(
      &num_blks,
      zy_sync_test,
      1024,
      max_sm);
  std::cout << "device=" << device << ",maxThreadsPerMultiProcessor=" << prop.maxThreadsPerMultiProcessor
            << ",sharedMemPerMultiprocessor=" << prop.sharedMemPerMultiprocessor
            << ",max_sm=" << max_sm
            << ",num_blks=" << num_blks << std::endl;

  mscclpp::DeviceSyncer syncer = {};
  HIDVA_CUDATHROW(cudaMemcpyToSymbol(dev_syncer, &syncer, sizeof(mscclpp::DeviceSyncer)));

  unsigned long i = 0;
  unsigned long same_sm = 0;
  while (true) {
    HIDVA_CUDATHROW(cudaMemcpyToSymbol(sm1_var, &sm1_var_expected, sizeof(sm1_var_expected)));
    HIDVA_CUDATHROW(cudaMemcpyToSymbol(sm2_var, &sm2_var_expected, sizeof(sm2_var_expected)));
    zy_sync_test<<<2, 1024, max_sm>>>();
    HIDVA_CUDATHROW(cudaDeviceSynchronize());
    HIDVA_CUDATHROW(cudaGetLastError());

    if (smids[0] == smids[1]) {
      same_sm += 1;
    }
    ++i;
    if (i % 10000 == 0) {
      std::cout << "Do it again!" << i << ",bad_apple=" << bad_apple_cnt << ",same_sm=" << same_sm << std::endl;
    }
  }

  return 0;
}

然后惊讶地发现一直是 bad_apple=0! 吃一堑长一智, 这次首先看下每个函数具体的语义, 就发现了:

void __threadfence(); is equivalent to cuda::atomic_thread_fence(cuda::memory_order_seq_cst, cuda::thread_scope_device).

memory_order_seq_cst!!! 手动将 DeviceSyncer.sync thread fence 改为 relaxed 的:

diff --git a/include/mscclpp/concurrency_device.hpp b/include/mscclpp/concurrency_device.hpp
index 6614b91..dc8d636 100644
--- a/include/mscclpp/concurrency_device.hpp
+++ b/include/mscclpp/concurrency_device.hpp
@@ -29,7 +29,8 @@ struct DeviceSyncer {
     if (blockNum == 1) return;
     if (threadIdx.x == 0) {
       // Need a `__threadfence()` before to flip `flag`.
-      __threadfence();
+      // __threadfence();
+      cuda::atomic_thread_fence(cuda::memory_order_relaxed, cuda::thread_scope_device);
       unsigned int tmp = preFlag_ ^ 1;
       if (atomicInc(&count_, maxOldCnt) == maxOldCnt) {
         atomicStore(&flag_, tmp, memoryOrderRelaxed);

很清晰地观察到了 bad apple:

Do it again!10000, bad_apple=20000
Do it again!20000, bad_apple=40000
Do it again!30000, bad_apple=60000
Do it again!40000, bad_apple=80000
Do it again!50000, bad_apple=100000
Do it again!60000, bad_apple=120000
Do it again!70000, bad_apple=140000
Do it again!80000, bad_apple=160000