令人哽咽的 python asyncio 调试

Posted by w@hidva.com on November 23, 2024

笔者近来一直在参与某个不可名状的项目, 从上到下都很重视, 每天都会拉起 Scrum 会对齐进度, 虽然压力满满但也斗志昂昂. 然后最近项目出现了一个 bug, 此 bug 严重阻塞了各位同学后续进一步验证工作, 已经连续几天在 Scrum 会上被点到. 于是我开始接手这个 bug 的分析定位工作, bug 的修复到很简单, 就是普通的多线程并发问题. 时间来到了当天晚上 7 点左右, 我已经完成了 bug 代码修复工作, 准备凑着晚间 GPU 资源闲置进行一轮压测验证. 然后不幸的事情开始发生了, 压测卡住了! 不再响应请求了! 看了下系统状态 C++ 层流量没有进来, 那只能是阻塞在 Python 层了. 当时就是一阵心虚. 要是阻塞在 C++, 那我真的有十八般武器等着她. 但是 Python, Python 实现细节这块我真的还不太熟. 当时第一念头就是按照之前排查 C++ coroutine 无响应的经验: 先恶补下 Python 实现细节, 之后再使用 GDB 挂住不响应的 Python 进程, 运用 Python 实现细节找出阻塞的各个 asyncio coroutine, 每个 coroutine 的堆栈, 以及相关对象各个字段的状态等. 当时也想着要不先杀掉卡住的进程, 用 pdb 启动看下, 但确实怕问题不再出现或者压测到半夜的时候出现了, 丢失了这宝贵的卡住现场…

总之, 在 4 小时之后我确实分析出问题所在, 一个应该存在的 asyncio coroutine 消失了, 这个 asyncio coroutine 是请求处理链路比较重要的一环, 其从输入 Queue 中拉取请求, 进行一些预处理工作, 之后塞到另外一组 Queue 中, 她消失了确实系统就会不响应了. 消失的原因也很直观, 异常没有处理导致的呗. 然后翻了下日志, 就发现那条异常堆栈就那样懒洋洋地躺在日志中…懊悔! 懊悔!! 为什么!!! 我没有第一时间想到翻翻日志!!!! 仅以此文纪念下我被耽误的 4 个小时!

P.S. 在你尝试使用本文提供的工具之前, 请记得先看下日志, 这毕竟只是 Python, 有一个很成熟的 runtime, 事情可能没有想象中的那种复杂.

定在那

DebuggingWithGdb 中, Python 提供了一些 gdb python script, 但这些 script 只是类似 pretty printer, 并不能用来控制 Python 执行流. 而我第一个需求是能让执行流定在一个指定的函数, 指定的行数中, 类似 gdb break 命令, 之后便可以在此处利用 PyFrameObjectPtr.iter_locals 等工具查看一些变量的状态了.

pygdb

pygdb 风格非常像我之前 jegdb 那款工具, 使用 jegdb 来调试内存相关 crash.

# pygdb.py, 适用于 python 3.10.X
import gdb
import sys
pyg = sys.modules['__main__']


class PyFuncBreakpoint(gdb.Breakpoint):
  def __init__(self, funcname):
    self.bf = funcname
    super().__init__('_PyEval_EvalFrameDefault')
  def stop(self):
    frame = gdb.selected_frame()
    f = frame.read_register('rsi').cast(gdb.lookup_type('PyFrameObject').pointer())
    frame = pyg.PyFrameObjectPtr(f)
    frame_name = frame.co_name.proxyval(set())
    if frame_name == self.bf:
      return True
    else:
      return False


class PyLineBreakpoint(gdb.Breakpoint):
  def __init__(self, fp, lineno):
    self.lineno = lineno
    f = gdb.Value(fp).cast(gdb.lookup_type('PyFrameObject').pointer())
    self.frame = pyg.PyFrameObjectPtr(f)
    super().__init__(f'((PyFrameObject*){fp}).f_lasti', gdb.BP_WATCHPOINT)
  def stop(self):
    lineno = self.frame.co.addr2line(int(self.frame.field('f_lasti'))*2)
    # print(f"{lineno=}")
    return lineno >= self.lineno


def frame_print_tacktrace(frame):
  frame.print_traceback()
  while True:
    back_f = frame.field('f_back')
    if int(back_f) == 0:
      break
    back_f = pyg.PyFrameObjectPtr(back_f)
    back_f.print_traceback()
    frame = back_f


def task_print_traceback(task_pointer):
  task_p = gdb.Value(task_pointer).cast(gdb.lookup_type('TaskObj').pointer())
  coro_p = task_p.dereference()['task_coro'].cast(gdb.lookup_type('PyCoroObject').pointer())
  frame_p = coro_p.dereference()['cr_frame'].cast(gdb.lookup_type('PyFrameObject').pointer())
  frame = pyg.PyFrameObjectPtr(frame_p)
  task_name = pyg.PyUnicodeObjectPtr(task_p.dereference()['task_name'].cast(gdb.lookup_type('PyUnicodeObject').pointer())).proxyval(set())
  print(f"{task_name=}")
  frame_print_tacktrace(frame)


def get_field(obj, field):
  for k, v in obj.get_attr_dict().iteritems():
    k = k.proxyval(set())
    if k == field:
      return v
  return None


# print _all_tasks
def print_all_tasks(all_task_p):
  all_tasks = pyg.HeapTypeObjectPtr(gdb.Value(all_task_p).cast(gdb.lookup_type('PyObject').pointer()))
  data = get_field(all_tasks, 'data')
  for t_weak_ref in data:
    t_weak_ref = t_weak_ref._gdbval.cast(gdb.lookup_type('PyWeakReference').pointer())
    task_p = int(t_weak_ref.dereference()['wr_object'])
    print(f"{task_p=}")
    task_print_traceback(int(t_weak_ref.dereference()['wr_object']))

id!

之前在 C++ 项目中, 我就比较喜欢将一些状态对象的地址输出到日志中, 这样在 corefile 或者 gdb alive process 时都省了一堆功夫来找到这些具有关键状态的对象. 在 python 中可以同样如此:

asyncio

如我之前在 为什么协程: 事后追踪 提到的:

还有一个可做的事情是, 针对用户态运行时的特点, 利用 gdb python script 能力扩展 gdb, 比如异步堆栈的展示,

本节演示下如何利用 pygdb print_all_tasks/frame_print_tacktrace 能力来展示当前系统中共有哪些 task, 每个 task 对应的堆栈是什么样子的.

# 本例子即 "为什么协程" 中提到的 asyncio server
def try_get_all_tasks():
  try:
    import asyncio.tasks
    return id(asyncio.tasks._all_tasks)
  except Exception:  # try failed
    return 0


if __name__ == '__main__':
  # 如 id 段所示, 记得把一些关键状态对象的地址输出一下.
  print(f"{try_get_all_tasks()=}")
  loop.create_task(server_main(), name="server")
  loop.run_forever()

# python why-coro-server.py
# try_get_all_tasks()=140320073961184
(gdb) pi
>>> pygdb.print_all_tasks(140320073961184)
task_p=140320083962912
task_name='server'
  File "why-coro-server.py", line 15, in server_main
task_p=140320081800016
task_name="client-('127.0.0.1', 42628)"
  File "why-coro-server.py", line 22, in client_main
task_p=140320081800432
task_name="client-('127.0.0.1', 42674)"
  File "why-coro-server.py", line 22, in client_main
>>>

本文一开始提到的问题正是利用 print_all_tasks 发现那个协程不在了…

demo

这是本文演示所用 demo.

import asyncio
from aiohttp import web

class Server:

  def __init__(self):
    self.q = asyncio.Queue()

  async def api_process_main(self, request):
    data = await request.json()
    future = asyncio.get_event_loop().create_future()
    print("this")
    print("is")
    print("a")
    print("print")
    print("statement")
    await self.q.put((data, future))
    resp = await future
    return web.json_response(resp)

  async def process_request(self, data):
    await asyncio.sleep(1)
    response = {'status': 'processed', 'original_data': data}
    return response

  async def worker_main(self):
    while True:
      data, future = await self.q.get()
      try:
        result = await self.process_request(data)
        future.set_result(result)
      except Exception as e:
        future.set_exception(e)

  def create_web_app(self):
    loop = asyncio.get_event_loop()
    loop.create_task(self.worker_main())

    app = web.Application()
    app.router.add_post('/process', self.api_process_main)
    return app


def try_get_all_tasks():
  try:
    import asyncio.tasks
    return id(asyncio.tasks._all_tasks)
  except Exception:  # try failed
    return 0


if __name__ == '__main__':
  server = Server()
  loop = asyncio.get_event_loop()
  print(f"{id(server)=} {id(loop)=} {try_get_all_tasks()=}")
  app = server.create_web_app()
  web.run_app(app, host='127.0.0.1', port=8080, loop=loop)

参考文章