Source code for sipi.flow

"""
    Simple Flow tools
"""

[docs]def coroutine(func): """ Decorator for coroutine creation & bootstrapping """ def _(*args, **kwargs): cr=func(*args, **kwargs) cr.next() return cr _.__name__=func.__name__ return _
[docs]def build_pipeline(blocks, pipe=None): """ Build a pipeline from the list of blocks The first element of the list must correspond to the first `block` of the pipeline. Each `block` is a tuple `(handler_function, name)`. The `handler_function` must accept 1 tuple parameter of the following signature: `(next_block, (context, msg))`. """ rblocks=reversed(blocks) for block in rblocks: handler, name=block if pipe is None: pipe=_processor((None, handler, name)) else: pipe=_processor((pipe, handler, name)) return pipe ### PRIVATE ################################################# #############################################################
def _processor((nxt, handler, name)): def loop(): try: while True: ctx, msg=(yield) msg=handler(nxt, (ctx, msg)) if msg is not None: nxt.send(msg) except KeyboardInterrupt: raise except Exception,e: try: nxt.send((ctx, ("error", e))) except: nxt.send((None, ("error", e))) l=loop() l.next() return l