A toolkit for Python coroutines
Project description
asynkit: A toolkit for Python coroutines
This module provides some handy tools for those wishing to have better control over the
way Python's asyncio
module does things.
- Helper tools for controlling coroutine execution, such as
CoroStart
andMonitor
- Utility classes such as
GeneratorObject
- Coroutine helpers such
coro_iter()
and theawaitmethod()
decorator - Helpers to run async code from non-async code, such as
await_sync()
andaiter_sync()
- Scheduling helpers for
asyncio
, and extended event-loop implementations - eager execution of Tasks
- Exprerimental support for Priority Scheduling of Tasks
- Other experimental features such as
task_interrupt()
- Limited support for
anyio
andtrio
.
Installation
pip install asynkit
Coroutine Tools
eager()
- lower latency IO
Did you ever wish that your coroutines started right away, and only returned control to
the caller once they become blocked? Like the way the async
and await
keywords work in the C# language?
Now they can. Just decorate or convert them with acynkit.eager
:
@asynkit.eager
async def get_slow_remote_data():
result = await execute_remote_request()
return result.important_data
async def my_complex_thing():
# kick off the request as soon as possible
future = get_slow_remote_data()
# The remote execution may now already be in flight. Do some work taking time
intermediate_result = await some_local_computation()
# wait for the result of the request
return compute_result(intermediate_result, await future)
By decorating your function with eager
, the coroutine will start executing right away and
control will return to the calling function as soon as it suspends, returns, or raises
an exception. In case it is suspended, a Task is created and returned, ready to resume
execution from that point.
Notice how, in either case, control is returned directly back to the calling function, maintaining synchronous execution. In effect, conventional code calling order is maintained as much as possible. We call this depth-first-execution.
This allows you to prepare and dispatch long running operations as soon as possible while still being able to asynchronously wait for the result.
asynkit.eager
can also be used directly on the returned coroutine:
log = []
async def test():
log.append(1)
await asyncio.sleep(0.2) # some long IO
log.append(2)
async def caller(convert):
del log[:]
log.append("a")
future = convert(test())
log.append("b")
await asyncio.sleep(0.1) # some other IO
log.append("c")
await future
# do nothing
asyncio.run(caller(lambda c: c))
assert log == ["a", "b", "c", 1, 2]
# Create a Task
asyncio.run(caller(asyncio.create_task))
assert log == ["a", "b", 1, "c", 2]
# eager
asyncio.run(caller(asynkit.eager))
assert log == ["a", 1, "b", "c", 2]
eager()
is actually a convenience function, invoking either coro_eager()
or func_eager()
(see below) depending on context.
Decorating your function makes sense if you always intend
To await its result at some later point. Otherwise, just apply it at the point
of invocation in each such case.
coro_eager()
, func_eager()
coro_eager()
is the magic coroutine wrapper providing the eager behaviour:
- It copies the current context
- It initializes a
CoroStart()
object for the coroutine, starting it in the copied context. - If it subsequently is
done()
It returnsCoroStart.as_future()
, otherwise it creates and returns aTask
(usingasyncio.create_task
by default.)
The result is an awaitable which can be either directly awaited or passed
to asyncio.gather()
. The coroutine is executed in its own copy of the current context,
just as would happen if it were directly turned into a Task
.
func_eager()
is a decorator which automatically applies coro_eager()
to the coroutine returned by an async function.
await_sync(), aiter_sync()
- Running coroutines synchronously
If you are writing code which should work both synchronously and asynchronously, you can now write the code fully async and then run it synchronously in the absence of an event loop. As long as the code doesn't block (await unfinished futures) and doesn't try to access the event loop, it can successfully be executed. This helps avoid writing duplicate code.
async def async_get_processed_data(datagetter):
data = datagetter() # an optionally async callback
data = await data if isawaitable(data) else data
return process_data(data)
# raises SynchronousError if datagetter blocks
def sync_get_processed_data(datagetter):
return asynkit.await_sync(async_get_processed_data(datagetter))
This sort of code might previously have been written thus:
# A hybrid function, _may_ return an _awaitable_
def hybrid_get_processed_data(datagetter):
data = datagetter()
if isawaitable(data):
# return an awaitable helper closure
async def helper():
data = await data
return process_data(data)
return helper
return process_data(data) # duplication
async def async_get_processed_data(datagetter):
r = hybrid_get_processed_data(datagetter)
return await r if isawaitable(r) else r
def sync_get_processed_data(datagetter):
r = hybrid_get_processed_data(datagetter)
if isawaitable(r):
raise RuntimeError("callbacks failed to run synchronously")
return r
The above pattern, writing async methods as sync and returning async helpers, is common in library code which needs to work both in synchronous and asynchronous context. Needless to say, it is very convoluted, hard to debug and contains a lot of code duplication where the same logic is repeated inside async helper closures.
Using await_sync()
it is possible to write the entire logic as async
methods and
then simply fail if the code tries to invoke any truly async operations.
If the invoked coroutine blocks, a SynchronousError
is raised from a SynchronousAbort
exception which
contains a traceback. This makes it easy to pinpoint the location in the code where the
async code blocked. If the code tries to access the event loop, e.g. by creating a Task
, a RuntimeError
will be raised.
The syncfunction()
decorator can be used to automatically wrap an async function
so that it is executed using await_sync()
:
>>> @asynkit.syncfunction
... async def sync_function():
... async def async_function():
... return "look, no async!"
... return await async_function()
...
>>> sync_function()
'look, no async!'
>>>
the asyncfunction()
utility can be used when passing synchronous callbacks to async
code, to make them async. This, along with syncfunction()
and await_sync()
,
can be used to integrate synchronous code with async middleware:
@asynkit.syncfunction
async def sync_client(sync_callback):
middleware = AsyncMiddleware(asynkit.asyncfunction(sync_callback))
return await middleware.run()
Using this pattern, one can write the middleware completely async, make it also work for synchronous code, while avoiding the hybrid function antipattern.
aiter_sync()
A helper function is provided, which turns an AsyncIterable
into
a generator, leveraging the await_sync()
method:
async def agen():
for v in range(3):
yield v
assert list(aiter_sync(agen())) == [1, 2, 3]
This is useful if using patterns such as GeneratorObject
in a synchronous
application.
CoroStart
This class manages the state of a partially run coroutine and is what what powers the coro_eager()
and await_sync()
functions.
When initialized, it will start the coroutine, running it until it either suspends, returns, or raises
an exception. It can subsequently be awaited to retrieve the result.
Similarly to a Future
, it has these methods:
done()
- returnsTrue
if the coroutine finished without blocking. In this case, the following two methods may be called to get the result.result()
- Returns the return value of the coroutine or raises any exception that it produced.exception()
- Returns any exception raised, orNone
otherwise.
But more importantly it has these:
__await__()
- A magic method making it directly awaitable. If it has already finished, awaiting this coroutine is the same as callingresult()
, otherwise it awaits the original coroutine's continued executionas_coroutine()
- A helper which returns a proper coroutine object to await theCoroStart
as_future()
- Ifdone()
, returns aFuture
holding its result, otherwise, aRuntimeError
is raised.as_awaitable()
- Ifdone()
, returnsas_future()
, else returnsself
. This is a convenience method for use with functions such asasyncio.gather()
, which would otherwise wrap a completed coroutine in aTask
.
In addition it has:
aclose()
- Ifnot done()
, will throw aGeneratorError
into the coroutine and wait for it to finish. Otherwise does nothing.athrow(exc)
- Ifnot done()
, will throw the given error into the coroutine and wait for it to raise or return a value.close()
andthrow(exc)
- Synchronous versions of the above, will raiseRuntimeError
if the coroutine does not immediately exit.
This means that a context manager such as aclosing()
can be used to ensure
that the coroutine is cleaned up in case of errors before it is awaited:
# start foo() and run until it blocks
async with aclosing(CoroStart(foo())) as coro:
... # do things, which may result in an error
return await coro
CoroStart can be provided with a contextvars.Context
object, in which case the coroutine will be run using that
context.
Context helper
coro_await()
is a helper function to await a coroutine, optionally with a contextvars.Context
object to activate:
var1 = contextvars.ContextVar("myvar")
async def my_method():
var1.set("foo")
async def main():
context = contextvars.copy_context()
var1.set("bar")
await asynkit.coro_await(my_method(), context=context)
# the coroutine didn't modify _our_ context
assert var1.get() == "bar"
# ... but it did modify the copied context
assert context.get(var1) == "foo"
This is similar to contextvars.Context.run()
but works for async functions. This function is
implemented using CoroStart
awaitmethod()
This decorator turns the decorated method into a Generator
as required for
__await__
methods, which must only return Iterator
objects.
It does so by invoking the coro_iter()
helper.
This makes it simple to make a class instance awaitable by decorating an async
__await__()
method.
class Awaitable:
def __init__(self, cofunc):
self.cofunc = cofunc
self.count = 0
@asynkit.awaitmethod
async def __await__(self):
await self.cofunc()
return self.count
self.count += 1
async def main():
async def sleeper():
await asyncio.sleep(1)
a = Awaitable(sleeper)
assert (await a) == 0 # sleep once
assert (await a) == 1 # sleep again
asyncio.run(main())
Unlike a regular coroutine (the result of calling a coroutine function), an object with an __await__
method can potentially be awaited multiple times.
The method can also be a classmethod
or staticmethod:
class Constructor:
@staticmethod
@asynkit.awaitmethod
async def __await__():
await asyncio.sleep(0)
return Constructor()
async def construct():
return await Constructor
awaitmethod_iter()
An alternative way of creating an await method, it uses
the coro_iter()
method to to create a coroutine iterator. It
is provided for completeness.
coro_iter()
This helper function turns a coroutine function into an iterator. It is primarily
intended to be used by the awaitmethod_iter()
function decorator.
Monitors and Generators
Monitor
A Monitor
object can be used to await a coroutine, while listening for out of band messages
from the coroutine. As the coroutine sends messages, it is suspended, until the caller resumes
awaiting for it.
async def coro(monitor):
await monitor.oob("hello")
await asyncio.sleep(0)
await monitor.oob("dolly")
return "done"
async def runner():
m = Monitor()
c = coro(m)
while True:
try:
print(await m.aawait(c))
break
except OOBData as oob:
print(oob.data)
which will result in the output
hello
dolly
done
For convenience, the Monitor
can be bound so that the caller does not have
to keep the coroutine around. Calling the monitor with the coroutine returns a BoundMonitor
:
async def coro(m):
await m.oob("foo")
return "bar"
m = Monitor()
b = m(coro(m))
try:
await b
except OOBData as oob:
assert oob.data == "foo"
assert await b == "bar"
Notice how the BoundMonitor
can be awaited directly, which is the same as awaiting
b.aawait(None)
.
The caller can pass in data to the coroutine via the aawait(data=None)
method and
it will become the return value of the Monitor.oob()
call in the coroutine.
Monitor.athrow()
can similarly be used to raise an exception out of the Montitor.oob()
call.
Neither data nor an exception can be sent the first time the coroutine is awaited,
only as a response to a previous OOBData
exception.
A Monitor
can be used when a coroutine wants to suspend itself, maybe waiting for some external
condition, without resorting to the relatively heavy mechanism of creating, managing and synchronizing
Task
objects. This can be useful if the coroutine needs to maintain state. Additionally,
this kind of messaging does not require an event loop to be present and can can be driven
using await_sync()
(see below.)
Consider the following scenario. A parser wants to read a line from a buffer, but fails, signalling this to the monitor:
async def readline(m, buffer):
l = buffer.readline()
while not l.endswith("\n"):
await m.oob(None) # ask for more data in the buffer
l += buffer.readline()
return l
async def manager(buffer, io):
m = Monitor()
a = m(readline(m, buffer))
while True:
try:
return await a
except OOBData:
try:
buffer.fill(await io.read())
except Exception as exc:
await a.athrow(exc)
In this example, readline()
is trivial, but if it were a stateful parser with hierarchical
invocation structure, then this pattern allows the decoupling of IO and the parsing of buffered data, maintaining the state of the parser while the caller fills up the buffer.
Any IO exception is sent to the coroutine in this example. This ensures that it cleans
up properly. Alternatively, aclose()
could have been used:
m = Monitor()
with aclosing(m(readline(m, buffer))) as a:
# the aclosing context manager ensures that the coroutine is closed
# with `await a.aclose()`
# even if we don't finish running it.
...
A standalone parser can also be simply implemented by two helper methods, start()
and
try_await()
.
async def stateful_parser(monitor, input_data):
while input_short(input_data):
input_data += await monitor.oob() # request more
# continue parsing, maybe requesting more data
return await parsed_data(monitor, input_data)
m: Monitor[Tuple[Any, bytes]] = Monitor()
initial_data = b""
p = m(stateful_parser(m, b""))
await p.start() # set the parser running, calling oob()
# feed data until a value is returned
while True:
parsed = await p.try_await(await get_more_data())
if parsed is not None:
break
This pattern can even be employed in non-async applications, by using
the await_sync()
method instead of the await
keyword to drive the Monitor
.
For a more complete example, have a look at example_resp.py
GeneratorObject
A GeneratorObject
builds on top of the Monitor
to create an AsyncGenerator
. It is in many ways
similar to an asynchronous generator constructed using the generator function syntax.
But whereas those return values using the yield
keyword,
a GeneratorObject has an ayield()
method, which means that data can be sent to the generator
by anyone, and not just by using yield
, which makes composing such generators much simpler.
The GeneratorObject
leverages the Monitor.oob()
method to deliver the ayielded data to whomever is iterating over it:
async def generator(gen_obj):
# yield directly to the generator
await gen_obj.ayield(1)
# have someone else yield to it
async def helper():
await gen_obj.ayield(2)
await asyncio.create_task(helper())
async def runner():
gen_obj = GeneratorObject()
values = [val async for val in gen_obj(generator(gen_obj))]
assert values == [1, 2]
The GeneratorObject
, when called, returns a GeneratorObjectIterator
which behaves in
the same way as an AsyncGenerator
object. It can be iterated over and supports the
asend()
, athrow()
and aclose()
methods.
A GeneratorObject
is a flexible way to asynchronously generate results without
resorting to Task
and Queue
objects. What is more, it allows this sort
of generating pattern to be used in non-async programs, via aiter_sync()
:
def sync_runner():
gen_obj = GeneratorObject()
values = [val for val in aiter_sync(gen_obj(generator(gen_obj)))]
assert values == [1, 2]
Scheduling tools
A set of functions are provided to perform advanced scheduling of Task
objects
with asyncio
. They work with the built-in event loop, and also with any event loop
implementing the AbstractSchedulingLoop
abstract base class, such as the SchedulingMixin
class which can be used to extend the built-in event loops.
Scheduling functions
sleep_insert(pos)
Similar to asyncio.sleep()
but sleeps only for pos
places in the runnable queue.
Whereas asyncio.sleep(0)
will place the executing task at the end of the queue, which is
appropriate for fair scheduling, in some advanced cases you want to wake up sooner than that, perhaps
after a specific task.
task_reinsert(task, pos)
Takes a runnable task (for example just created with asyncio.create_task()
or similar) and
reinserts it at a given position in the queue.
Similarly as for sleep_insert()
, this can be useful to achieve
certain scheduling goals.
task_switch(task, *, insert_pos=None)
Immediately moves the given task to the head of the ready queue and switches to it, assuming it is runnable.
If insert_pos is not None
, the current task will be
put to sleep at that position, using sleep_insert()
. Otherwise the current task is put at the end
of the ready queue. If insert_pos == 1
the current task will be inserted directly after the target
task, making it the next to be run. If insert_pos == 0
, the current task will execute before the target.
task_is_blocked(task)
Returns True if the task is waiting for some awaitable, such as a Future or another Task, and is thus not on the ready queue.
task_is_runnable(task)
Roughly the opposite of task_is_blocked()
, returns True if the task is neither done()
nor blocked and
awaits execution.
create_task_descend(coro)
Implements depth-first task scheduling.
Similar to asyncio.create_task()
this creates a task but starts it running right away, and positions the caller to be woken
up right after it blocks. The effect is similar to using asynkit.eager()
but
it achieves its goals solely by modifying the runnable queue. A Task
is always
created, unlike eager
, which only creates a task if the target blocks.
Runnable task helpers
A few functions are added to help working with tasks.
The following identity applies:
asyncio.all_tasks() = (
asynkit.runnable_tasks() | asynkit.blocked_tasks() | asyncio.current_task()
)
runnable_tasks(loop=None)
Returns a set of the tasks that are currently runnable in the given loop
blocked_tasks(loop=None)
Returns a set of the tasks that are currently blocked on some future in the given loop.
Event Loop tools
Also provided is a mixin for the built-in event loop implementations in python, providing some primitives for advanced scheduling of tasks. These primitives are what is used by the scheduling functions above, and so custom event loop implementations can provide custom implementations of these methods.
SchedulingMixin
mixin class
This class adds some handy scheduling functions to the event loop. The are intended to facilitate some scheduling tricks, particularly switching to tasks, which require finding items in the queue and re-inserting them at an early position. Nothing is assumed about the underlying implementation of the queue.
queue_len()
- returns the length of the ready queuequeue_find(self, key, remove)
- finds and optionally removes an element in the queuequeue_insert_pos(self, pos, element)
- inserts an element at positionpos
the queuecall_pos(self, pos, ...)
- schedules a callback at positionpos
in the queue
Concrete event loop classes
Concrete subclasses of Python's built-in event loop classes are provided.
SchedulingSelectorEventLoop
is a subclass ofasyncio.SelectorEventLoop
with theSchedulingMixin
SchedulingProactorEventLoop
is a subclass ofasyncio.ProactorEventLoop
with theSchedulingMixin
on those platforms that support it.
Event Loop Policy
A policy class is provided to automatically create the appropriate event loops.
SchedulingEventLoopPolicy
is a subclass ofasyncio.DefaultEventLoopPolicy
which instantiates either of the above event loop classes as appropriate.
Use this either directly:
asyncio.set_event_loop_policy(asynkit.SchedulingEventLoopPolicy())
asyncio.run(myprogram())
or with a context manager:
with asynkit.event_loop_policy():
asyncio.run(myprogram())
Priority Scheduling
FIFO scheduling
Since the beginning, scheduling of Tasks in asyncio
has always been FIFO, meaning "first-in, first-out". This is a design principle which provides a certain fairness to tasks, ensuring that all tasks run and a certain predictability is achieved with execution. FIFO is maintained in the following places:
- In the Event Loop, where tasks are executed in the order in which they become runnable
- In locking primitives (such as
asyncio.Lock
orasyncio.Condition
) where tasks are able to acquire the lock or get notified in the order in which they arrive.
All tasks are treated equally.
The asynkit.experimental.priority
module
- Note: This is currently an experimental feature.
In pre-emptive system, such as scheduling of threads
or processes
there is usually some sort of priority
involved too,
to allow designating some tasks as more important than others, thus requiring more rapid servicing, and others as having
lower priority and thus be relegated to background tasks where other more important work is not pending.
The asynkit.experimental.priority
module now allows us to do something similar.
You can define the priority
of Task objects. A task defining the effective_priority()
method returning
a float
will get priority treatment in the following areas:
- When awaiting a
PriorityLock
orPriorityCondition
- When waiting in to be executed by a
PrioritySelectorEventLoop
or aPriorityProactorEventLoop
.
The floating point priority value returned by effective_priority()
is used to determine the task's priority, with lower
values giving higher priority (in the same way that low values are sorted before high values).
If this method is missing, the default priority of 0.0
is assumed. The Priority
enum class can be
used for some basic priority values, defining Priority.HIGH
as -10.0 and Priority.LOW
as 10.0.
In case of identical priority values, FIFO order is respected.
The locking primitives provided are fully compatible with the standard
locks in asyncio
and also fully support the experimental task interruption feature.
PriorityTask
This is an asyncio.Task
subclass which implements the effective_priority()
method. It can be constructed with a priority
keyword
or a priority_value
attribute. It also participates in Priority Inheritance.
PriorityLock
This is a asyncio.Lock
subclass which respects the priorities of any Task
objects attempting to acquire it. It also participates in Priority Inheritance.
PriorityCondition
This is an asyncio.Condition
subclass which respects the priorities of any Task
objects awaiting to be woken up. Its default
lock
is of type PriorityLock
.
DefaultPriorityEventLoop
This is an asyncio.AbstractEventLoop
subclass which respects the priorities of any Task objects waiting to be executed. It also
provides all the scheduling extensions from AbstractSchedulingLoop
. It also participates in Priority Inheritance.
This is either a PrioritySelectorEventLoop
or a PriorityProactorEventLoop
, both instances of the PrioritySchedulingMixin
class.
Priority Inversion
A well known problem with priority scheduling is the so-called Priority Inversion problem. This implementation addresses that by two different means:
Priority Inheritance
A PriorityTask
keeps track of all the PriorityLock
objects it has acquired, and a PriorityLock
keeps track of all the asyncio.Task
objects waiting to acquire it. A PriorityTask
's effective_priority()
method will be the highest effective_priority of any
task waiting to acquire a lock held by it. Thus, a high priority-task which starts waiting for a lock which is held by a
low-priority task, will temporarily propagate its priority to that task, so that ultimately, the PrioritySchedulingMixin
event
loop with ensure that the previously low-priority task is now executed with the higher priority.
This mechanism requires the co-operation of both the tasks, locks and the event-loop to properly function.
Priority Boosting
The PrioritySchedulingMixin
will regularly do "queue maintenance" and will identify Tasks that have sat around in the queue for
many cycles without being executed. It will randomly "boost" the priority of these tasks in the queue, so that they have a chance
to run.
This mechanism does not require the co-operation of locks and tasks to work, and is in place as a safety mechanism in applications
where it is not feasible to replace all instances of Lock
s and Task
s with their priority_inheritance-aware counterparts.
How to use Priority Scheduling
To make use of Priority scheduling, you need to use either the priority scheduling event loop (e.g.
DefaultPriorityEventLoop
) or a priority-aware synchronization primitive, i.e. PriorityLock
or PriorityCondition
. In addition, you need Task
objects which support the effective_priority()
method, such as PriorityTask
It is possible to get priority behaviour from locks without having a priority event loop, and vice versa. But when using the priority event loop, it is recommended to use the accompanying lock and task classes which co-operate to provide priority inheritance.
A good first step, in your application, is to identify tasks
that perform background work, such as housekeeping tasks, and assign to them the Priority.LOW
priority.
Subsequently you may want to identify areas of your application that require more attention than others. For a web-application's URL handler may elect to temporarily raise the priority (change PriorityTask.priority_value
) for certain endpoints to give them better response.
This is new territory and it remains to be seen how having priority scheduling in a co-operative`` environment such as
asyncio` actually works in practice.
Coroutine helpers
A couple of functions are provided to introspect the state of coroutine objects. They
work on both regular async coroutines, classic coroutines (using yield from
) and
async generators.
-
coro_is_new(coro)
- Returns true if the object has just been created and hasn't started executing yet -
coro_is_suspended(coro)
- Returns true if the object is in a suspended state. -
coro_is_done(coro)
- Returns true if the object has finished executing, e.g. by returning or raising an exception. -
coro_get_frame(coro)
- Returns the current frame object of the coroutine, if it has one, orNone
.
anyio
support
The library has been tested to work with the anyio
. However, not everything is supported on the trio
backend.
Currently only the asyncio
backend can be assumed to work reliably.
When using the asyncio backend, the module asynkit.experimental.anyio
can be used, to provide "eager"-like
behaviour to task creation. It will return an EagerTaskGroup
context manager:
from asynkit.experimental.anyio import create_eager_task_group
from anyio import run, sleep
async def func(task_status):
print("hello")
task_status.started("world")
await sleep(0.01)
print("goodbye")
async def main():
async with create_eager_task_group() as tg:
start = tg.start(func)
print("fine")
print(await start)
print("world")
run(main, backend="asyncio")
This will result in the following output:
hello
fine
world
goodbye
world
The first part of the function func
is run even before calling await
on the result from EagerTaskGroup.start()
Similarly, EagerTaskGroup.start_soon()
will run the provided coroutine up to its first blocking point before
returning.
trio
limitations
trio
differs significantly from asyncio
and therefore enjoys only limited support.
-
The event loop is completely different and proprietary and so the event loop extensions don't work for
trio
. -
CoroStart
when used withTask
objects, such as by usingEagerTaskGroup
, does not work reliably withtrio
. This is because the synchronization primitives are not based onFuture
objects but rather performTask
-based actions both before going to sleep and upon waking up. If aCoroStart
initially blocks on a primitive such asEvent.wait()
orsleep(x)
it will be surprised and throw an error when it wakes up on in a differentTask
than when it was in when it fell asleep.
CoroStart
works by intercepting a Future
being passed up via the await
protocol to
the event loop to perform the task scheduling. If any part of the task scheduling has happened
before this, and the continuation happens on a different Task
then things may break
in various ways. For asyncio
, the event loop never sees the Future
object until
as_coroutine()
has been called and awaited, and so if this happens in a new task, all is good.
Experimental features
Some features are currently available experimentally. They may work only on some platforms or be experimental in nature, not stable or mature enough to be officially part of the library
Task Interruption
Methods are provided to raise exceptions on a Task
. This is somewhat similar to
task.cancel()
but different:
- The caller specifies the exception instance to be raised on the task.
- The target task made to run immediately, precluding interference with other operations.
- The exception does not propagate into awaited objects. In particular, if the task is awaiting another task, the wait is interrupted, but that other task is not otherwise affected.
A task which is blocked, waiting for a future, is immediately freed and scheduled to run. If the task is already scheduled to run, i.e. it is new, or the future has triggered but the task hasn't become active yet, it is still awoken with an exception.
Please note the following cases:
-
The Python library in places assumes that the only exception that can be raised out of awaitables is
CancelledError
. In particular, there are edge cases inasyncio.Lock
,asyncio.Semaphore
andasyncio.Condition
where raising something else when acquiring these primitives will leave them in an incorrect state.Therefore, we provide a base class,
InterruptError
, deriving fromCancelledError
which should be used for interrupts in general.However, currently
asyncio.Condition
will not correctly pass on such a subclass forwait()
in all cases, so a safer version,InterruptCondition
is provided. -
Even subclasses of
CancelledError
will be converted to a newCancelledError
instance when not handled in a task, and awaited. -
These functions currently are only work reliably with
Task
object implemented in Python. Modern implementation often have a native "C" implementation ofTask
objects and they contain inaccessible code which cannot be used by the library. In particular, theTask.__step
method cannot be explicitly scheduled to the event loop. For that reason, a specialcreate_pytask()
helper is provided to create a suitable pythonTask
instance. -
However: This library does go through extra hoops to make it usable with C Tasks. It almost works, but with two caveats:
- CTasks which have plain
TaskStepMethWrapper
callbacks scheduled cannot be interrupted. These are typically tasks executingawait asyncio.sleep(0)
or freshly created tasks that haven't started executing. - The CTask's
_fut_waiting
member cannot be cleared from our code, so there exists a time where it can point to a valid, not-done, Future, even though the Task is about to wake up. This will make methods such astask_is_blocked()
return incorrect values. It will get cleared when the interrupted task starts executing, however. All the more reason to usetask_interrupt()
overtask_throw()
since the former allows no space for code to see the task in such an intermediate state.
- CTasks which have plain
task_throw()
def task_throw(task: Task, exc: BaseException):
pass
This method will make the target Task
immediately runnable with the given exception
pending.
-
If the Task was runnable due to a previous call to
task_throw()
, this will override that call and its exception. -
Because of that, this method should probably not be used directly. It is better to ensure that the target takes delivery of the exception right away, because there is no way to queue pending exceptions and they do not add up in any meaningful way. Prefer to use
task_interrupt()
below. -
This method will fail if the target task has a pending cancellation, that is, it is in the process of waking up with a pending
CancelledError
. Cancellation is currently asynchronous, while throwing exceptions is intended to be synchronous.
task_interrupt()
async def task_interrupt(task: Task, exc: BaseException):
pass
An async
version of task_throw()
. When awaited, task_interrupt()
is called,
followed by a task_switch()
to the target. Once awaited, the exception
has been raised on the target task.
By ensuring that the target task runs immediately, it is possible to reason about task execution without having to rely on external synchronization primitives and the cooperation of the target task. An interrupt is never pending on the task (as a cancellation can be) and therefore it cannot cause collisions with other interrupts.
async def test():
async def task():
await asyncio.sleep(1)
create_pytask(task)
await asyncio.sleep(0)
assert task_is_blocked(task)
await task_interrupt(task, InterruptException())
assert task.done() # the error has already been raised.
try:
await task
except CancelledError: # original error is substituted
pass
else:
assert False, "never happens"
create_pytask()
Similar to asyncio.create_task()
but will create a pure Python Task
which can safely
be used as the target for task_throw()
and task_interrupt()
. Because of implementation
issues, regular C Task
objects, as returned by asyncio.create_task()
, cannot
be interrupted in all cases, in particular when doing an await asyncio.sleep(0)
or
directly after having been created.
task_timeout()
This is a context manager providing a timeout functionality, similar to asyncio.timeout()
.
By leveraging task_throw()
and a custom BaseException
subclass, TimeoutInterrupt
,
the logic becomes very simple and there is no unintended interaction with regular
task cancellation().
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.