如之前我关于 vllm pd 分离实现系列文章所示:
在完成 PD 分离第一版实现之后, 紧接着便是与 vllm 其他模块, 比如 DP, 的适配. DP 适配开发过程不在本文提起, 出乎我意料的是适配工作出奇的简单.
P.S. 这里 vllm kvt 方案便是我前几篇文章提到的我们之前完成的 PD 分离实现方案.
唯一不满意的是目前 vllm DP sync 方案, 需要请求下发模块感知 dp 存在, 该模块在将请求下发给其中一个 EngineCore 时还需要给其他 dp EngineCore 下发一次 wave. 这个很麻烦! 主要是我们 PD 分离逻辑也需要给 EngineCore 下发请求, 这意味着我们 PD 分离逻辑也需要类似的逻辑, 比如唤醒其他 dp EngineCore. 本来我以为 PR 17546: API-server scaleout with many-to-many server-engine comms 能救我一下, 但发现并不行.
vllm dp sync 实现
为什么需要 wave? 我们用 sglang 这幅图解释一下:
如图所示, 在 DP size = 4 情况下, 当 Frontend 接收到一个新的请求 R, 其按照指定策略选择一个 DP rank 进行处理, 假设这里选择了 DP rank 0, 即请求 R 进入了 batch1; 这里 batch2/batch3/batch4 仍然是空. 那么将只有 DP rank 0 开始 forward, 并且其将在 All Gather 集合通信那里阻塞, 直至 batch2/3/4 也开始对应的 forward.
vllm 对此解法, 我们通过如下示意图描述. 如下所示我对此的主要槽点: Frontend 需要感知到 DP 实现细节, 并做必需的唤醒工作.
sequenceDiagram
participant F as Frontend
participant C0 as Core0
participant C1 as Core1
participant C2 as Core2
participant C3 as Core3
Note Over F: 收到了新请求 R, 选择 C0;
Note Over C0,C3: block on input_queue.get
par Frontend 唤醒
F ->> C0: ADD(R)
F ->> C1: START_DP_WAVE
F ->> C2: START_DP_WAVE
F ->> C3: START_DP_WAVE
end
par Core 纷纷从 input_queue.get 中被唤醒
C0 ->> C0: 从 input_queue.get 中返回
C1 ->> C1: 从 input_queue.get 中返回
C2 ->> C2: 从 input_queue.get 中返回
C3 ->> C3: 从 input_queue.get 中返回
end
par Core 开始 Step1
C0 ->> C0: Step, 处理 R
C1 ->> C1: Step, dummy
C2 ->> C2: Step, dummy
C3 ->> C3: Step, dummy
end
par Core 开始 Step2, R 已经结束.
C0 ->> C0: Step, 下发 finished R 给 worker 清理状态<br/>这时不会发起 forward
C0 ->> C0: Step, dummy
C1 ->> C1: Step, dummy
C2 ->> C2: Step, dummy
C3 ->> C3: Step, dummy
end
Note Over C0,C3: AllGather 收集每个 Core 是否有 alive req
Note Over C0,C3: 大家都没活了, block on input_queue.get
my vllm dp sync 实现
具体源码如 commit 所示, 这里简单介绍一下. 我们这里实现中有一个 Coordinator 与多个 Participant, 每个 dp rank 对应着一个 Participant.
Coordinator 组件, 其维护着如下两个状态:
_step
, Coordinator 视角看到的 step._parts
, 记录了所有参与者相关信息:
class _ParticipantInfo(
msgspec.Struct,
array_like=True, # type: ignore[call-arg]
omit_defaults=True, # type: ignore[call-arg]
gc=False): # type: ignore[call-arg]
dprank: int = 0
host: str = ''
port: int = 0
# 在 Coordinator 视角下, 参与者最近一次同步的 step.
# 后来发现没必要, 因为 coordinator 总是原子性更新自身 _step 以及所有
# 参与者 step, 即这些值总是取值相同的.
# step: int = 0
P.S. STEP! 看到 step 我又想起来之前看 NCCL 代码的折磨了, NCCL 真是特别喜欢用 step, 如下代码中 3 个 step 表示 3 种不同类型的 step…
proxyOp->nsteps = nstepsPerLoop * nLoops * chunkSteps;
Coordinator 作为一个 rpc server, 运行在 dp rank 0 EngineCore 进程某个后台线程中, 其提供如下 rpc 接口:
reg_part(info)
, 参与者 participant 在启动时调用该 rpc 接口注册自身.
assert self._parts[info.dprank] is None
# Coordinator 将 participant 发来的自身信息保存在 _parts 中.
self._parts[info.dprank] = info
start_step(step)
, 当参与者 participant 准备发起一次 model forward 时调用该接口. 此时 step 为 participant 视角看到的值.
if step > self._step:
# _do_start_step 后台运行, 其逻辑不会阻塞 _start_step rpc 返回.
self._loop.create_task(self._do_start_step(step))
@_kill_me_if_exception
async def _do_start_step(self, step: int):
if step <= self._step:
return
# coordinator 首先使用参与者 step 更新自身.
# 之后将 coordinator.step 通过调用所有 participant start_step rpc
# 通知 participant coordinator 视角最新的 step.
# 这里有个优化手段: coordinator 可以前进一大步, 避免 participant/coordinator
# 有过多的交互.
self._step = step + 24
futs = [self._part_start_step(idx) for idx in range(self._dpsize)]
await asyncio.gather(*futs)
return
这里需要注意的是 EngineCore 在仅有 finished req 也会调用 execute_model(scheduler_output), 但此时 scheduler_output.total_num_scheduled_tokens 为 0, gpu model runner 并不会发起一次 forward. 这时 engine core 不能前进自身 step.
Participant 组件, 其维护着两个状态 local_step, coord_step; local_step 为 engine core 正在进行或者已经完成的 step. coord_step 为最近一次从 coordinator 同步来的 coordinator.step 值. 若 local_step < coord_step, 此时意味着其他 dp rank 有请求待执行, 当前 engine core 需要继续发起一次 forward.
def engines_running(self) -> bool:
# Core Thread
return self._local_step < self._coord_step
def _process_engine_step(self):
if self.scheduler.has_unfinished_requests():
# engine core 有请求待执行, 即 engine core 需要发起一次 forward.
self._dppart.new_step()
elif self.engines_running():
# 此时当前 engine core 没有请求待执行, 但其他 engine core 有请求待执行.
self._dppart.new_step()
# 所以需要一次 dummy batch.
self.execute_dummy_batch()
super()._process_engine_step()
return
def new_step(self):
# Core Thread
self._local_step += 1
if self._local_step <= self._coord_step:
return
# 调用 coordinator start_step rpc 通知 coordinator.
_start_step_rpc_sync(self._coord_host, self._coord_port,
self._local_step)
return
如上所示 Coordinator 会在自身 step 前进时调用 Participant start_step rpc 通知 participant:
# Participant rpc server 运行在一个新线程中, 其收到 coordinator start_step rpc
# 之后通过 core.input_queue 传递这一信息.
#
# 若当前 engine_core 阻塞在 input_queue.get 中, 这时还会唤醒 engine core.
def start_step_threadsafe(self, step: int):
req = (None, '_start_step', (step, ))
self.input_queue.put_nowait((EngineCoreRequestType.UTILITY, req))
return
def _start_step(self, step: int):
# 该函数调用发生在 Core Thread 中.
if step > self._coord_step:
self._coord_step = step
return
如下我们模拟跑上一遍, 就很清晰了:
sequenceDiagram
participant F as Frontend
participant C0 as Core0
participant P0 as Participant0
participant C as coordinator
participant C1 as Core1
participant P1 as Participant1
participant C2 as Core2
participant P2 as Participant2
participant C3 as Core3
participant P3 as Participant3
Note Over F: 收到了新请求 R, 选择 C0;
Note Over C0,C3: block on input_queue.get
F ->> C0: ADD(R)
C0 ->> C0: 从 input_queue.get 中被唤醒.
C0 -->> P0: new_step, local_step=1, <br/>local_step >coord_step, 需要 rpc
P0 -->> C: rpc start_step(step=1)
C -->> C: coord.step = 1<br/>先不考虑 +24 优化
par coord 调用所有 participant start_step rpc
C -->> P0: start_step(step=1)
C -->> P1: start_step(step=1)
C -->> P2: start_step(step=1)
C -->> P3: start_step(step=1)
end
par participant 通知对应 engine core
P0 -->> C0: input_queue('_start_step', 1)
C0 -->> C0: coord_step = 1 <br/>这时 C0 在进行 execute_model<br/>并不会立即执行.
P1 -->> C1: input_queue('_start_step', 1)
C1 -->> C1: coord_step = 1 <br/>engines_running=True<br/>从 input_queue.get 被唤醒
C1 -->> P1: new_step, local_step=1 <br/>local_step == coord_step, 没有 rpc
P2 -->> C2: input_queue('_start_step', 1)
C2 -->> C2: coord_step = 1 <br/>engines_running=True<br/>从 input_queue.get 被唤醒
C2 -->> P2: new_step, local_step=1 <br/>local_step == coord_step, 没有 rpc
P3 -->> C3: input_queue('_start_step', 1)
C3 -->> C3: coord_step = 1 <br/>engines_running=True<br/>从 input_queue.get 被唤醒
C3 -->> P3: new_step, local_step=1 <br/>local_step == coord_step, 没有 rpc
end
par 开始 Step 1
C0 ->> C0: Step, 处理 R <br/> R 在这 1 个 step 结束
C1 ->> C1: Step, dummy
C2 ->> C2: Step, dummy
C3 ->> C3: Step, dummy
end
par run_busy_loop 下一次迭代:
C0 ->> C0: local_step==coord_step=1, engines_running=False<br/>has finished req, execute_model <br/> 但由于没有 forward, 所以不会 new_step.
C1 ->> C1: local_step==coord_step=1, engines_running=False<br/>
C2 ->> C2: local_step==coord_step=1, engines_running=False<br/>
C3 ->> C3: local_step==coord_step=1, engines_running=False<br/>
end
Note Over C0,C3: 大家都没活了, block on input_queue.get
这样多简单啊, 不需要 AllGather 集合通信, 所有 DP 相关细节都封装在 DPEngineCoreProc 内部, 外部不需要感知. 如实现所示我们移除了 EngineCoreRequest, CoreClient, EngineCoreOutputs 等其他模块与 DP Wave 相关的代码. 整个实现复用了 disagg 提供的大部分实施, 真正干活的, 与 DP wave 相关逻辑不足 30 行.
另外, 当 DP rank 都位于一台机器上时, 我们也可以采用共享内存实现, 这样整体实现效率会更高一点, 不再需要 rpc 了. 但目前我们线上这类需要 DP 模型的部署一般都会跨机, 所以这一优化价值不大.
后记
我发现推理框架都好喜欢用 zmq 啊.. 但其实在我目前接触到的一些场景中, 如果用 rpc 来解耦的话实现应该会更简单一点.