Open
Description
Some of the libraries I use are built around a bunch of coroutines and they make liberal use of async functions. In order to call them, I can either wrap my async function in a dirty-but-works way (specifying the args three times):
@task
def mytask(c, someopt=False):
loop = asyncio.get_event_loop()
return loop.run_until_complete(_mytask(c, someopt))
async def _mytask(c, someopt=False):
pass
Or something even hackier that intercepts the inspect.getargspec
call and gives the original argspec so Invoke is happy that it has a context object and knows about the other arguments.
import inspect
class ArgSpecPatch:
def __init__(self, original):
self.registry = {}
self.original = original
def register(self, func, wrapped):
self.registry[wrapped] = self.original(func)
def __call__(self, *args, **kwargs):
if not (len(args) == 1 and len(kwargs) == 0):
return self.original(*args, **kwargs)
func = args[0]
if func not in self.registry:
return self.original(*args, **kwargs)
return self.registry[func]
inspect.getargspec = ArgSpecPatch(inspect.getargspec)
def atask(func):
@functools.wraps(func)
def _wrap(*args, **kwargs):
loop = asyncio.get_event_loop()
return loop.run_until_complete(func(*args, **kwargs))
inspect.getargspec.register(func, _wrap)
return task(_wrap)
@atask
async def mytask(c, option=False):
pass
Is there an easier place to insert a check if the task passed to @task
is a coroutine function and if so, exhaust the task in an event loop? Not asking for any task concurrency/parallelism (like #63); they could just be run serially.