Async Python Demystified: A Practical Deep Dive into asyncio
Learn how Python's asyncio really works — from the event loop and coroutines to gather, TaskGroup, timeouts, semaphores, and running blocking code without freezing your program.
If you have ever watched a script crawl through a list of HTTP requests one at a time — each one sitting idle while the network does its thing — you have felt the problem that asyncio was built to solve. Most real programs spend the bulk of their time waiting: on sockets, on disk, on databases, on other services. Asynchronous programming lets a single thread put that waiting time to work, juggling thousands of in-flight operations without the overhead of threads or processes.
The trouble is that asyncio has a reputation for being confusing. The async/await syntax looks simple, but the mental model underneath — coroutines, the event loop, tasks, futures — trips up a lot of people. This guide builds that model from the ground up, with runnable examples you can paste into a file and try immediately.
The core idea: cooperative concurrency
Threads use preemptive multitasking: the operating system can pause any thread at any moment. asyncio uses cooperative multitasking instead. Your code runs on a single thread, and it only switches to another task when it hits an await — an explicit point where it says "I'm going to wait now, go run something else." This is why async code is easier to reason about than threaded code: switches happen only at await points you can see.
The component that orchestrates all this is the event loop. It keeps a queue of ready-to-run tasks, runs each one until it awaits something, and parks it until whatever it was waiting for is done. A coroutine is the function you write with async def; calling it doesn't run it — it returns a coroutine object that the loop must drive.
import asyncio
async def greet(name):
print(f"Hello, {name}")
await asyncio.sleep(1) # yields control back to the loop
print(f"Goodbye, {name}")
asyncio.run(greet("world"))
asyncio.run() is your entry point: it creates an event loop, runs the coroutine to completion, and cleans up. You call it exactly once, at the top level of your program.
Running things concurrently with gather
Awaiting one coroutine after another is still sequential. The payoff comes when you launch several operations and let them overlap. asyncio.gather() schedules multiple coroutines at once and waits for all of them.
import asyncio
import time
async def fetch(name, delay):
print(f"start {name}")
await asyncio.sleep(delay) # stand-in for a network call
print(f"done {name}")
return f"{name}:{delay}"
async def main():
start = time.perf_counter()
results = await asyncio.gather(
fetch("a", 0.2),
fetch("b", 0.1),
fetch("c", 0.3),
)
print(results)
print(f"elapsed {time.perf_counter() - start:.2f}s")
asyncio.run(main())
The three "fetches" total 0.6 seconds of sleeping, but the program finishes in about 0.3 seconds — the time of the slowest one — because they wait concurrently. gather returns results in the order you passed the coroutines, not the order they completed.
Tasks: scheduling work in the background
Under the hood, gather wraps each coroutine in a Task. A task is a coroutine that the loop has started tracking. You can create one explicitly with asyncio.create_task() when you want something running in the background while you do other work.
import asyncio
async def heartbeat():
while True:
print("tick")
await asyncio.sleep(0.5)
async def main():
task = asyncio.create_task(heartbeat())
await asyncio.sleep(1.6) # let it tick a few times
task.cancel() # stop the background task
try:
await task
except asyncio.CancelledError:
print("heartbeat stopped")
asyncio.run(main())
An important gotcha: if you create a task and never keep a reference to it, the garbage collector can destroy it mid-flight. Always store tasks in a variable or collection until they're done.
TaskGroup: the modern, safer way (Python 3.11+)
Since Python 3.11, asyncio.TaskGroup is the recommended way to run a batch of tasks. It improves on gather in one crucial way: if any task fails, the others are automatically cancelled, and you don't leak half-finished work. It's structured concurrency — tasks can't outlive the block that created them.
import asyncio
async def work(n):
await asyncio.sleep(0.1)
return n * n
async def main():
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(work(i)) for i in range(5)]
# the 'async with' block exits only after all tasks finish
print([t.result() for t in tasks])
asyncio.run(main())
If you're on Python 3.11 or newer, prefer TaskGroup over bare gather for anything non-trivial. On older versions, gather(..., return_exceptions=True) is the closest equivalent for collecting errors without one failure killing the batch.
Timeouts and cancellation
Network calls hang. A robust async program puts a ceiling on how long it will wait. Python 3.11 added asyncio.timeout(), a clean context manager for this:
import asyncio
async def slow():
await asyncio.sleep(5)
async def main():
try:
async with asyncio.timeout(0.2):
await slow()
except TimeoutError:
print("timed out as expected")
asyncio.run(main())
On Python 3.10 and earlier, use asyncio.wait_for(slow(), timeout=0.2), which raises asyncio.TimeoutError instead. Either way, the underlying task is cancelled when the limit is hit, so you don't keep paying for work you no longer need.
Limiting concurrency with a semaphore
Concurrency is powerful, but firing off ten thousand requests at once will get you rate-limited or run you out of file descriptors. An asyncio.Semaphore caps how many coroutines can be inside a critical section at the same time.
import asyncio
sem = asyncio.Semaphore(2) # at most 2 at once
active = 0
peak = 0
async def job(i):
global active, peak
async with sem:
active += 1
peak = max(peak, active)
await asyncio.sleep(0.05)
active -= 1
async def main():
await asyncio.gather(*(job(i) for i in range(10)))
print("peak concurrency:", peak) # never exceeds 2
asyncio.run(main())
This pattern — a semaphore wrapping the "expensive" part of each task — is the standard way to build a bounded worker pool in async code.
The cardinal sin: blocking the event loop
Because everything runs on one thread, a single blocking call freezes everything. If you call time.sleep(5), requests.get(), or a CPU-heavy loop inside a coroutine, the event loop cannot switch to any other task for the entire duration. Your "concurrent" program grinds to a halt.
The rule is simple: never call blocking, synchronous code directly inside a coroutine. When you must use a blocking library, hand it off to a thread pool with asyncio.to_thread() (Python 3.9+), which keeps the loop free:
import asyncio
import time
def blocking_io(n):
time.sleep(0.1) # a synchronous, blocking call
return n * 2
async def main():
results = await asyncio.gather(
*(asyncio.to_thread(blocking_io, i) for i in range(5))
)
print(results)
asyncio.run(main())
For CPU-bound work, threads won't help because of the GIL — reach for loop.run_in_executor() with a ProcessPoolExecutor, or step outside asyncio entirely.
Reacting to results as they arrive
Sometimes you don't want to wait for the whole batch — you want to process each result the moment it's ready. asyncio.as_completed() yields awaitables in completion order:
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
return name
async def main():
coros = [task("slow", 0.3), task("fast", 0.1), task("mid", 0.2)]
for fut in asyncio.as_completed(coros):
print("finished:", await fut) # fast, then mid, then slow
asyncio.run(main())
Common pitfalls to remember
A few mistakes account for most async bugs. Calling a coroutine without awaiting it produces a "coroutine was never awaited" warning and runs nothing — the call only creates an object. Mixing blocking libraries (plain requests, synchronous database drivers) into async code silently destroys your concurrency; use async-native libraries like httpx, aiohttp, or asyncpg instead. And calling asyncio.run() more than once, or from inside an already-running loop, raises a RuntimeError — there should be exactly one top-level run() per program.
Wrap-up and next steps
You now have the working model: an event loop drives coroutines, await marks the cooperative switch points, and tasks let work proceed in the background. gather and TaskGroup run things concurrently, asyncio.timeout and wait_for keep slow operations in check, semaphores bound your concurrency, and to_thread rescues you from blocking code.
From here, the natural next step is to apply it to real I/O. Install httpx or aiohttp and rewrite a synchronous "download these 50 URLs" script as an async one with a semaphore — you'll see the speedup firsthand. After that, explore async context managers (async with), async iterators (async for), and an async web framework like FastAPI to see how these primitives scale into production systems.