Storm.py json decode exception

Posted by w@hidva.com on December 8, 2017

事故现场如图:

异常堆栈 异常 json 串 ShellBolt.java 源码

本来因为是 ShellBolt 出现 BUG 了. 感觉像 writeBoltMsg() 时遇到了 InterruptedException 导致只写入了 "comp: nu", 然后又一次调用 writeBoltMsg() 导致错乱了.

后来发现不对. 因为这个异常只出现在我们的一个组件 C 内. C 使用了多进程, 并且 C 的子进程中调用了 storm.emit, 相关源码参见 storm.py, 摘录如下:

def readTaskIds():
    if pending_taskids:
        return pending_taskids.popleft()
    else:
        msg = readMsg()
        while type(msg) is not list:
            pending_commands.append(msg)
            msg = readMsg()
        return msg

def emit(*args, **kwargs):
    __emit(*args, **kwargs)
    return readTaskIds()

storm.emit() 会读取标准输入. 本来呢我是一直以为 storm.emit 只会写标准输出来着, 所以才会在 C 的子进程中调用了 storm.emit(). 后来发现其会读取标准输入之后, 机智的我一下子就觉得是这里导致的异常. 不过其实还是有一点疑惑就是 storm.emit() 虽然会读取标准输入, 但都是以 readMsg() 为单位读取的, 所以在子进程中调用 storm.emit() 最多会造成一些 tuple 由于被子进程读取导致无法被 C 感知导致丢失, 但不应该会污染数据啊!

但不管怎样, 当我把子进程的 storm.emit() 替换为 multiprocessing.Queue.put(), 然后在 C 中通过 storm.emit(Queue.get()) 之后确实是不会出现异常了, 所以问题应该被修复了吧==