packages/shared-skills/skills/programming/references/python/async-anyio.md
Skill mandate:
import asynciois BANNED. Useimport anyioexclusively. This reference targets AnyIO 4.x (2026 Python projects).
AnyIO uses structured concurrency via task groups. A task group is an async context manager that guarantees all child tasks finish before the block exits.
start_soon — fire-and-forgetimport anyio
async def worker(n: int) -> None:
await anyio.sleep(1)
print(f"task {n} done")
async def main() -> None:
async with anyio.create_task_group() as tg:
for i in range(3):
tg.start_soon(worker, i)
print("all tasks finished")
anyio.run(main)
Signature: tg.start_soon(func, *args, name=None)
func must be a coroutine function (not a coroutine object).name is optional, for introspection/debugging.ExceptionGroup on exit.start — wait for ready signalUse when a task must initialize before the caller proceeds (e.g., starting a server and then connecting to it).
from anyio import TASK_STATUS_IGNORED, create_task_group, run
from anyio.abc import TaskStatus
async def start_server(port: int, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED) -> None:
listener = await anyio.create_tcp_listener(local_host="127.0.0.1", local_port=port)
task_status.started() # unblocks tg.start()
await listener.serve(handler)
async def main() -> None:
async with create_task_group() as tg:
await tg.start(start_server, 8080) # blocks until task_status.started()
# server is guaranteed ready here
async with await anyio.connect_tcp("127.0.0.1", 8080) as client:
...
run(main)
Rule of thumb:
start_soon when you don't need to know when the task is ready.start when the task must signal readiness before you continue.create_task — retrieving return values (AnyIO 4.14+)async def add(x: int, y: int) -> int:
return x + y
async def main() -> None:
async with anyio.create_task_group() as tg:
handle = tg.create_task(add(2, 4))
result = await handle # == 6
print(handle.return_value) # also 6
anyio.run(main)
Signature: tg.create_task(coro, *, name=None, context=None) -> TaskHandle[T]
TaskHandle you can await for the result.TaskFailed (or TaskCancelled).asyncio.gather when you need results.| asyncio | anyio | Notes |
|---|---|---|
asyncio.gather(a, b, c) | tg.create_task(a); tg.create_task(b); tg.create_task(c); results = [await h for h in handles] | No direct gather; structured concurrency requires explicit task group scope. For fire-and-forget, use tg.start_soon. |
asyncio.create_task(coro) | tg.start_soon(func, *args) or tg.create_task(coro) | start_soon takes a coroutine function + args. create_task takes a coroutine object and returns a handle. |
asyncio.sleep(n) | anyio.sleep(n) | Identical semantics. |
asyncio.wait_for(coro, timeout) | with anyio.fail_after(timeout): await coro | Raises TimeoutError. Use move_on_after for silent timeout. |
asyncio.Event() | anyio.Event() | AnyIO events are not reusable; create a new one instead of .clear(). |
asyncio.Lock() | anyio.Lock() | Use async with lock:. Pass fast_acquire=True if performance-critical. |
asyncio.Semaphore(n) | anyio.Semaphore(n) | Same. Pass fast_acquire=True if performance-critical. |
asyncio.Condition() | anyio.Condition() | Same semantics. |
asyncio.run(main()) | anyio.run(main) | Backend-agnostic entry point. |
asyncio.Queue(maxsize=N) | anyio.create_memory_object_stream[T](max_buffer_size=N) | Returns (send_stream, receive_stream). Supports async for on receive end. |
asyncio.to_thread(fn, *args) | anyio.to_thread.run_sync(fn, *args) | Supports abandon_on_cancel=True and custom limiter. |
asyncio.run_coroutine_threadsafe(coro, loop) | anyio.from_thread.run(func, *args) | Call async code from a worker thread. |
loop.call_soon_threadsafe(callback) | anyio.from_thread.run_sync(func, *args) | Call sync code in event loop thread from worker thread, with return value. |
asyncio.shield(coro) | with anyio.CancelScope(shield=True): ... | AnyIO shielding does not orphan tasks. |
asyncio.timeout(delay) | with anyio.fail_after(delay): ... | AnyIO uses level cancellation, not edge cancellation. |
asyncio.CancelledError | anyio.get_cancelled_exc_class() | Use this to catch cancellation portably across backends. |
AnyIO uses level cancellation (inspired by Trio), not asyncio's edge cancellation.
CancelledError is injected once. If caught and not re-raised, the task keeps running.from anyio import CancelScope, create_task_group, get_cancelled_exc_class, sleep, run
async def worker() -> None:
try:
await sleep(10)
except get_cancelled_exc_class():
print("cancelled!")
raise # ALWAYS re-raise cancellation exceptions
async def main() -> None:
async with create_task_group() as tg:
tg.start_soon(worker)
await sleep(0.1)
tg.cancel_scope.cancel() # cancels all children
run(main)
Shield a block from external cancellation. Essential for cleanup.
from anyio import CancelScope, create_task_group, sleep, run
async def main() -> None:
async with create_task_group() as tg:
with CancelScope(shield=True):
tg.start_soon(some_task)
tg.cancel_scope.cancel() # shielded block is protected
await sleep(1) # this still runs
run(main)
Combine with timeouts for graceful shutdown:
from anyio import CancelScope, move_on_after
async def do_something(resource) -> None:
try:
await run_async_stuff()
except BaseException:
# Allow up to 10s for cleanup, then move on
with move_on_after(10, shield=True):
await resource.aclose()
raise
A task group contains its own CancelScope. If any child task raises an exception:
ExceptionGroup if multiple) is re-raised.Two context managers. Both create a CancelScope internally.
fail_after — raises on timeoutfrom anyio import fail_after, sleep, run
async def main() -> None:
try:
with fail_after(5) as scope:
await sleep(10)
except TimeoutError:
print("timed out")
print(scope.cancelled_caught) # True
run(main)
move_on_after — silent timeoutfrom anyio import move_on_after, sleep, run
async def main() -> None:
with move_on_after(5) as scope:
await sleep(10)
print("this never prints")
print("exited scope, cancelled =", scope.cancelled_caught)
run(main)
from anyio import move_on_after
# Give cleanup 10 seconds, but don't let outer cancellation interrupt it
with move_on_after(10, shield=True):
await resource.aclose()
Replaces asyncio.Queue with a safer, typed, structured-concurrency-friendly construct.
from anyio import create_task_group, create_memory_object_stream, run
from anyio.streams.memory import MemoryObjectReceiveStream
async def consumer(stream: MemoryObjectReceiveStream[str]) -> None:
async with stream: # closes receive end on exit
async for item in stream:
print("received", item)
async def main() -> None:
# Type-annotated stream creation (AnyIO 4+ syntax)
send_stream, receive_stream = create_memory_object_stream[str](max_buffer_size=10)
async with create_task_group() as tg:
tg.start_soon(consumer, receive_stream)
async with send_stream:
for i in range(5):
await send_stream.send(f"item {i}")
# send_stream closed → consumer's async for loop exits naturally
run(main)
Key differences from asyncio.Queue:
max_buffer_size=0 means send blocks until a receiver is ready.async for item in receive_stream: works out of the box.create_memory_object_stream[T]().close() and async with work.AnyIO is backend-agnostic. Code written against AnyIO APIs runs on both asyncio and Trio.
import anyio
async def main() -> None:
print("running on", anyio.current_async_library())
await anyio.sleep(1)
# Default backend (asyncio)
anyio.run(main)
# Explicit backend
anyio.run(main, backend="trio")
anyio.run(main, backend="asyncio", backend_options={"debug": True})
Library design rule: Never hardcode a backend. Let the application choose via anyio.run(). Libraries should only import anyio and avoid backend-specific APIs.
If a third-party library exposes only an asyncio interface (returns asyncio coroutine objects), it works directly under the asyncio backend because AnyIO runs on top of asyncio's event loop:
import anyio
import some_asyncio_only_lib # returns asyncio.Future/coroutine objects
async def main() -> None:
# This works because under the asyncio backend, await passes through
result = await some_asyncio_only_lib.fetch_data()
anyio.run(main, backend="asyncio")
Important: This only works on the asyncio backend. On the trio backend, asyncio-native objects will not work.
Some APIs have no AnyIO equivalent and require direct event loop access:
| Scenario | asyncio API | AnyIO approach |
|---|---|---|
| Signal handlers | loop.add_signal_handler() | anyio.open_signal_receiver() |
| Custom protocols | asyncio.Protocol | Use AnyIO streams / sockets |
| Direct Future manipulation | asyncio.Future | Avoid; use AnyIO primitives |
| Eager task factories | asyncio.eager_task_factory | Experimental in AnyIO; avoid |
If you absolutely need the running loop:
import asyncio
async def main() -> None:
loop = asyncio.get_running_loop()
# ... do something loop-specific ...
# WARNING: this breaks backend-agnosticism
anyio.run(main, backend="asyncio")
Best practice: Wrap asyncio-only code in a backend-agnostic facade, and document that the feature requires the asyncio backend.
import anyio
async def fetch(url: str) -> bytes:
await anyio.sleep(0.5) # simulate
return b"data"
async def main() -> None:
urls = ["a", "b", "c"]
async with anyio.create_task_group() as tg:
with anyio.move_on_after(5):
for url in urls:
tg.start_soon(fetch, url)
# All tasks are cancelled on timeout; task group waits for cleanup
anyio.run(main)
import anyio
from anyio.streams.memory import MemoryObjectReceiveStream
async def producer(send_stream: anyio.streams.memory.MemoryObjectSendStream[int]) -> None:
async with send_stream:
for i in range(100):
await send_stream.send(i)
async def consumer(receive_stream: MemoryObjectReceiveStream[int]) -> None:
async with receive_stream:
async for item in receive_stream:
print(f"consumed {item}")
async def main() -> None:
send, receive = anyio.create_memory_object_stream[int](max_buffer_size=5)
async with anyio.create_task_group() as tg:
tg.start_soon(producer, send)
tg.start_soon(consumer, receive)
anyio.run(main)
import time
import anyio
async def main() -> None:
# Run blocking function in worker thread
result = await anyio.to_thread.run_sync(time.sleep, 2)
print("done")
anyio.run(main)
import anyio
def blocking_callback() -> None:
# Inside a worker thread, call back into the event loop
anyio.from_thread.run(anyio.sleep, 1)
anyio.from_thread.run_sync(print, "hello from thread")
async def main() -> None:
await anyio.to_thread.run_sync(blocking_callback)
anyio.run(main)
import anyio
async def worker() -> None:
try:
await anyio.sleep_forever()
except anyio.get_cancelled_exc_class():
with anyio.CancelScope(shield=True):
await anyio.sleep(0.5) # cleanup
print("cleaned up")
raise
async def main() -> None:
async with anyio.create_task_group() as tg:
tg.start_soon(worker)
await anyio.sleep(1)
tg.cancel_scope.cancel()
anyio.run(main)
cb245dba): https://github.com/agronholm/anyio