很早之前, 在我刚接触到 asyncio 观其文档时, 就看到其反复提及过:
Important: Save a reference to the result of this function, to avoid a task disappearing mid-execution. The event loop only keeps weak references to tasks. A task that isn’t referenced elsewhere may get garbage collected at any time, even before it’s done. For reliable “fire-and-forget” background tasks, gather them in a collection:
但我看我们的代码, 却从来没有这么做过, 所以我对这个也没怎么放在心上, 以为就是和编译器报出的 warning 一样是一件稀松平常的事情. 直到这个回旋镖打到了我的脸上…
最近笔者一直在 vllm 上接入我们之前开发的 KVCacheTransfer/KVCacheStore 框架, B.T.W: 我们在这里对 vllm 的改动与 Nvidia Dynamo 出乎意料的一致==
在初步开发完成之后, 我就顺手拉起了个长稳测试环境就跑步去了. 没想到跑步回来看到了一个 coredump:
(gdb) py-bt
Traceback (most recent call first):
<built-in method abort of module object at remote 0x7f628dda4360>
File "/mnt/workspace/zhanyi/vllm/vllm/v1/engine/disagg.py", line 529, in _loop_on_ex
os.abort()
Garbage-collecting
File "/usr/lib/python3.10/logging/__init__.py", line 1084, in flush
self.stream.flush()
File "/usr/lib/python3.10/logging/__init__.py", line 1104, in emit
self.flush()
File "/usr/lib/python3.10/logging/__init__.py", line 968, in handle
self.emit(record)
File "/usr/lib/python3.10/logging/__init__.py", line 1696, in callHandlers
hdlr.handle(record)
File "/usr/lib/python3.10/logging/__init__.py", line 1634, in handle
self.callHandlers(record)
File "/usr/lib/python3.10/logging/__init__.py", line 1624, in _log
self.handle(record)
File "/usr/lib/python3.10/logging/__init__.py", line 1477, in info
self._log(INFO, msg, args, **kwargs)
File "/mnt/workspace/zhanyi/vllm/vllm/v1/engine/disagg.py", line 481, in _do_add_req
logger.info("disagg add request. reqid=%s retry=%s peer=%s:%s:%s:%s",
File "/mnt/workspace/zhanyi/vllm/vllm/v1/engine/disagg.py", line 496, in _add_request
peer_id, first_token = await self._do_add_req(req, idx)
File "/mnt/workspace/zhanyi/vllm/vllm/v1/engine/disagg.py", line 35, in wrapper
return await async_func(*args, **kwargs)
File "/mnt/workspace/zhanyi/vllm/vllm/v1/engine/disagg.py", line 520, in _d_thread_main
loop.run_until_complete(state._main())
File "/usr/lib/python3.10/threading.py", line 953, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
self.run()
File "/usr/lib/python3.10/threading.py", line 973, in _bootstrap
self._bootstrap_inner()
这个 abort 本身是我有意为之, 正如我在 令人哽咽的 python asyncio 调试 提到的, 我个人非常不喜欢未被显式处理的异常, 所以针对 unhandled exception 我都倾向于直接 abort 进程:
def _loop_on_ex(_loop, context):
logger.exception("loop ex. context=%s", context)
os.abort()
loop.set_exception_handler(_loop_on_ex)
# 随着本次 coredump 的日志:
#Message: 'loop ex. context=%s'
#Arguments: {'message': 'Task was destroyed but it is pending!', 'task': <Task pending name='Task-22' coro=<_kill_me_if_exception.<locals>.wrapper() done, defined at /mnt/workspace/zhanyi/vllm/vllm/v1/engine/disagg.py:33> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_chain_future.<locals>._call_set_state() at /usr/lib/python3.10/asyncio/futures.py:392]>}
这也是我一贯的主张:
但这个异常堆栈却实非我意! 当然看到 Task was destroyed but it is pending
这个错误的第一瞬间我就想起来了 asyncio 文档反复唠叨的: 一定要保存 Task 啊. 但我有一点不明白的是, 既然不保存 Task 真的会导致 Task 提前退出, 那为啥我们的代码没有问题? 根据我们代码的特性抽取个一个小 demo:
import threading, asyncio, sys, weakref, gc, time
def weakref_cb(objid: int, wref):
print(f">>>>>>>>>>>> weakref_cb: {objid=}")
async def task1_main():
tsk = asyncio.current_task()
tsk_objid = id(tsk)
wref = weakref.ref(tsk, lambda r: weakref_cb(tsk_objid, r))
print(f"{tsk_objid=} {sys.getrefcount(tsk)=}")
del tsk
while True:
print(f"Ok! Ok! I am Fine!")
await asyncio.sleep(10)
def f():
global loop
print(f"{id(loop)=}")
coro = task1_main()
asyncio.run_coroutine_threadsafe(coro, loop)
return
# loop = uvloop.new_event_loop()
loop = asyncio.get_event_loop()
# gc.set_debug(gc.DEBUG_STATS | gc.DEBUG_COLLECTABLE | gc.DEBUG_UNCOLLECTABLE | gc.DEBUG_SAVEALL | gc.DEBUG_LEAK)
# gc.collect()
def thread_main():
global loop
print("new thread")
loop.run_forever()
def g():
f()
print(f"{id(loop)=}")
t = threading.Thread(target=thread_main, args=(), name="test")
t.start()
time.sleep(3)
g()
print("f done")
while True:
gc.collect()
time.sleep(1)
t.join()
运行起来发现, 嘿! 还真的没有触发 Task 回收!!! 上 GDB 追踪下 python GC 过程, 众所周知, python 在 PyObject 前面放置了个 PyGC_Head 结构存放了 GC 相关信息, 因此给定一个 PyObject 地址, 我们只要将其减去 sizeof(PyGC_Head) = 16 并利用 GDB Hardware watchpoint 能力监听下这 16 字节发生的写行为就能对该对象在 GC 过程中是如何处理的有个大概认知:
在 asyncio 中, asyncio.sleep
会创建一个 TimerHandle, 存放着 loop._scheduled 中, TimerHandle 会持有着一个 Future, 之后在 Task 执行 await asyncio.sleep
时, 会将 Task.__wakeup
连同 Task 自身保存在 Future._callbacks
中!
这也解释了为啥我们代码没有问题, 因为我们代码所有的 Task 都是类似 while True: await asyncio.sleep
的结构. 而我在 vllm 适配的这块代码却没有 await asyncio.sleep
其只是 await StreamReader.readexactly()
, 而从 asyncio 实现可以看到这背后并不会有谁持有着 task 引用, 如下示例代码便能看到 Task 被提前终止了:
reader, _ = await asyncio.open_connection('hidva.com', '80')
while True:
print(f"Ok! I am alive!")
await reader.readexactly(300)
(zyvllm) $ python asyncio_task3.py
id(loop)=139913803743184
new thread
id(loop)=139913803743184
f done
tsk_objid=139913803450464 sys.getrefcount(tsk)=5
Ok! I am alive!
Task was destroyed but it is pending!
task: <Task pending name='Task-1' coro=<task1_main() done, defined at /mnt/workspace/zhanyi/tmp/asyncio-task/asyncio_task3.py:11> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_chain_future.<locals>._call_set_state() at /usr/lib/python3.10/asyncio/futures.py:392]>
^CTraceback (most recent call last):
File "/mnt/workspace/zhanyi/tmp/asyncio-task/asyncio_task3.py", line 51, in <module>
time.sleep(1)
KeyboardInterrupt
修复
怎么修复, 也很简单, 早在 令人哽咽的 python asyncio 调试 这里被坑了之后, 我就为所有用作 task main 的函数新增了个装饰器:
# All coroutines used as the main of task must apply this decorator.
def kill_me_if_exception(async_func):
async def task_main(*args, **kwargs):
try:
return await async_func(*args, **kwargs)
except Exception:
logger.exception("async_func: ex")
os.abort()
return task_main
@kill_me_if_exception
async def an_task_main():
pass
就在 kill_me_if_exception
上动点手脚就行了:
# 注意线程安全~
_bladkvt_disagg_running_task: set[asyncio.Task] = set()
# All coroutines used as the main of task must apply this decorator.
def kill_me_if_exception(async_func):
async def task_main(*args, **kwargs):
global _bladkvt_disagg_running_task
task = asyncio.current_task()
_bladkvt_disagg_running_task.add(task)
task.add_done_callback(_bladkvt_disagg_running_task.remove)
try:
return await async_func(*args, **kwargs)
except Exception:
print("ex")
os.abort()
return task_main
后记
话说你有没有注意到在 python asyncio_task3.py
输出中并没有看到 weakref_cb
中应该输出的日志?! 这是因为: