如我们之前在 为什么协程 中提到过的:
内核中的公平调度器提供的是抢占着调度能力, 在线程调度到具体的 CPU 上执行一段时间之后, 即使这个线程对应的逻辑执行流仍有指令可以继续执行, 内核也会强行挂起当前执行线程, 转而从可运行队列中选择下一个线程来调度其到 CPU 上运行. 而位于线程中的用户态调度器提供的是协作式调度, 她仅在每个协程逻辑执行流主动让出对线程的占有时才有机会调度其他协程执行. 所以经常会出现的一个问题是, 某个协程占据线程时间过长, 导致处于可执行队列中的其他协程得不到运行时机, 这种情况在 asyncio 这种 M:1 对应关系下更为严重. 所以这里事前防御便是要求每一位业务开发要记得协作.
可观测性的一个重要目的就是确保协程真的如期在进行协作. 根据过往经验结合 Linux 内核调度系统指标设计, 我个人倾向于增加如下几个指标:
- 协程从可运行到真正运行的时延分布.
- 可运行队列中协程数目.
- 用户态运行时, 即 asyncio loop, 处于繁忙的时间段.
- 用户态运行时, 即 asyncio loop, 处于空闲的时间段.
同样根据过往经验, 这些指标大部分时刻都是处于正常状态的, 但我还是喜欢把她们显式展示出来, 只是因为不喜欢两眼摸黑的感觉. 如果是从 0 构建一个用户态运行时, 这些指标的添加会很简单, 在相应处打点就行了. 但这是 python, 作为一个业务方我们不期望还要维护着一个自定义的用户态运行时, 所以倾向于通过利用 python 高度灵活性以及 asyncio 自身扩展机制来完成这些指标的实现.
实现
具体的实现逻辑真的很简单:
class _MySelector(selectors.DefaultSelector):
def __init__(self):
super().__init__()
# self._enter_hunger: Optional[int] = None # time.monotonic_ns
self._leave_hunger: int = time.monotonic_ns()
def select(self, timeout=None):
if timeout is not None and timeout <= 0:
# 此时是 busy polling, 不属于 hunger.
return super().select(timeout)
now = time.monotonic_ns()
busy_ns = now - self._leave_hunger
self._busy_metric.observe(busy_ns)
ret = super().select(timeout)
self._leave_hunger = time.monotonic_ns()
self._idle_metric.observe(self._leave_hunger - now)
return ret
# run queue
class _RunQueue:
def __init__(self):
# int is submit timestamp, monotonic_ns
self._data: Deque[Tuple[Optional[int], asyncio.Handle]] = collections.deque()
def clear(self):
self._data.clear()
def __len__(self):
n = len(self._data)
self._runnable_metric.observe(n)
return n
def append(self, handle: asyncio.Handle):
now = time.monotonic_ns()
self._data.append((now, handle))
def popleft(self) -> asyncio.Handle:
submit_ns, handle = self._data.popleft()
self._wait_metric.observe(time.monotonic_ns() - submit_ns)
return handle
class _MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
def new_event_loop(self):
selector = _MySelector()
loop = asyncio.SelectorEventLoop(selector)
if hasattr(loop, '_ready'):
if isinstance(loop._ready, collections.deque):
if len(loop._ready) <= 0:
origin_ready_id = id(loop._ready)
origin_ready_rc = sys.getrefcount(loop._ready)
loop._ready = _RunQueue()
logger.info(f"_MyEventLoopPolicy Hacked. {id(loop)=} {id(loop._ready)=} {origin_ready_id=} {origin_ready_rc=}")
return loop
asyncio.set_event_loop_policy(_MyEventLoopPolicy())
整个实现唯一值得担忧的点是 _RunQueue
, 她的接口定义完全是依据了 asyncio.BaseEventLoop 的实现, 比较黑了一点, 但确实也找不到更具有兼容性的改法了==, 我这里确认了下 _RunQueue
与 python v3.8 ~ v3.14 都是兼容的. 还有个值得一提的点是当前 loop.call_soon_threadsafe
就是直接调用的 loop._ready.append
, 这是因为 collections.deque.append
是直接用 C 实现的, 其执行期间都是持有着 GIL 的, 所以不再需要额外的锁保护. 我们目前 _RunQueue.append
实现是线程安全的.
对性能的影响: 在我们的业务场景下测试, 对业务性能数据没有任何影响.
P.S. 这套机制刚开始集成就帮我们发现了一个 bug: