事故现场如图:
本来因为是 ShellBolt 出现 BUG 了. 感觉像 writeBoltMsg()
时遇到了 InterruptedException
导致只写入了 "comp: nu"
, 然后又一次调用 writeBoltMsg() 导致错乱了.
后来发现不对. 因为这个异常只出现在我们的一个组件 C 内. C 使用了多进程, 并且 C 的子进程中调用了 storm.emit
, 相关源码参见 storm.py, 摘录如下:
def readTaskIds():
if pending_taskids:
return pending_taskids.popleft()
else:
msg = readMsg()
while type(msg) is not list:
pending_commands.append(msg)
msg = readMsg()
return msg
def emit(*args, **kwargs):
__emit(*args, **kwargs)
return readTaskIds()
即 storm.emit()
会读取标准输入. 本来呢我是一直以为 storm.emit
只会写标准输出来着, 所以才会在 C 的子进程中调用了 storm.emit()
. 后来发现其会读取标准输入之后, 机智的我一下子就觉得是这里导致的异常. 不过其实还是有一点疑惑就是 storm.emit()
虽然会读取标准输入, 但都是以 readMsg()
为单位读取的, 所以在子进程中调用 storm.emit()
最多会造成一些 tuple 由于被子进程读取导致无法被 C 感知导致丢失, 但不应该会污染数据啊!
但不管怎样, 当我把子进程的 storm.emit()
替换为 multiprocessing.Queue.put()
, 然后在 C 中通过 storm.emit(Queue.get())
之后确实是不会出现异常了, 所以问题应该被修复了吧==