Complete Guide to Python Asyncio
Master asynchronous Python programming with asyncio. Covers coroutines, tasks, event loops, async/await, gather, semaphores, queues, HTTP clients, websockets, and debugging async code.
Note: This guide follows English-language naming conventions and terminology standards common in international development teams. Examples use English identifiers and comments to maximize compatibility across codebases and tooling.
Complete Guide to Python Asyncio
Introduction
Asyncio is Python’s framework for writing concurrent code using async/await syntax. It uses a single-threaded event loop to manage multiple coroutines, making it ideal for I/O-bound workloads like HTTP requests, database queries, and websocket connections. This guide covers coroutines, tasks, the event loop, concurrency primitives, async HTTP clients, websockets, and debugging.
Coroutines and async/await
Basic coroutine
import asyncio
async def fetch_data(url: str) -> str:
print(f"Fetching {url}")
await asyncio.sleep(1) # Simulate I/O
return f"Data from {url}"
async def main():
result = await fetch_data("https://example.com")
print(result)
asyncio.run(main())
Running multiple coroutines sequentially
async def main():
start = asyncio.get_event_loop().time()
result1 = await fetch_data("https://api1.example.com")
result2 = await fetch_data("https://api2.example.com")
result3 = await fetch_data("https://api3.example.com")
elapsed = asyncio.get_event_loop().time() - start
print(f"Sequential: {elapsed:.2f}s") # ~3.0s
Running concurrently with asyncio.gather
async def main():
start = asyncio.get_event_loop().time()
results = await asyncio.gather(
fetch_data("https://api1.example.com"),
fetch_data("https://api2.example.com"),
fetch_data("https://api3.example.com"),
)
elapsed = asyncio.get_event_loop().time() - start
print(f"Concurrent: {elapsed:.2f}s") # ~1.0s
print(results)
Error handling with gather
async def fetch_with_error(url: str) -> str:
if "error" in url:
raise ValueError(f"Failed to fetch {url}")
await asyncio.sleep(0.5)
return f"Data from {url}"
async def main():
# return_exceptions=True keeps errors as results instead of raising
results = await asyncio.gather(
fetch_with_error("https://good.example.com"),
fetch_with_error("https://error.example.com"),
fetch_with_error("https://good2.example.com"),
return_exceptions=True,
)
for result in results:
if isinstance(result, Exception):
print(f"Error: {result}")
else:
print(f"Success: {result}")
Tasks
Creating tasks manually
async def main():
# Schedule coroutines as tasks — they start running immediately
task1 = asyncio.create_task(fetch_data("https://api1.example.com"))
task2 = asyncio.create_task(fetch_data("https://api2.example.com"))
# Do other work while tasks run
print("Tasks started, doing other work...")
await asyncio.sleep(0.5)
# Await tasks when you need results
result1 = await task1
result2 = await task2
print(result1, result2)
Task cancellation
async def long_running():
try:
while True:
print("Working...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Task cancelled, cleaning up...")
raise # Re-raise to propagate cancellation
async def main():
task = asyncio.create_task(long_running())
await asyncio.sleep(3.5)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task was cancelled")
Task groups (Python 3.11+)
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_data("https://api1.example.com"))
task2 = tg.create_task(fetch_data("https://api2.example.com"))
task3 = tg.create_task(fetch_data("https://api3.example.com"))
# All tasks complete when the context manager exits
print(task1.result(), task2.result(), task3.result())
The Event Loop
Running the event loop
# asyncio.run() — recommended for top-level entry point
asyncio.run(main())
# Manual loop control (for advanced use cases)
async def main():
loop = asyncio.get_running_loop()
print(f"Running on: {loop}")
# Run in background thread (for mixing sync/async code)
import asyncio
from threading import Thread
class AsyncRunner:
def __init__(self):
self.loop = asyncio.new_event_loop()
self.thread = Thread(target=self.loop.run_forever, daemon=True)
self.thread.start()
def submit(self, coro):
return asyncio.run_coroutine_threadsafe(coro, self.loop).result()
runner = AsyncRunner()
result = runner.submit(fetch_data("https://example.com"))
Semaphores (Limiting Concurrency)
async def fetch_with_limit(url: str, semaphore: asyncio.Semaphore) -> str:
async with semaphore:
await asyncio.sleep(0.5)
return f"Data from {url}"
async def main():
semaphore = asyncio.Semaphore(10) # Max 10 concurrent
urls = [f"https://api{i}.example.com" for i in range(100)]
tasks = [fetch_with_limit(url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} URLs")
Queues
Producer-consumer pattern
async def producer(queue: asyncio.Queue, items: list):
for item in items:
await queue.put(item)
print(f"Produced: {item}")
await queue.put(None) # Sentinel
async def consumer(queue: asyncio.Queue, consumer_id: int):
while True:
item = await queue.get()
if item is None:
await queue.put(None) # Pass sentinel to next consumer
break
await asyncio.sleep(0.5)
print(f"Consumer {consumer_id} processed: {item}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
items = list(range(20))
producers = [asyncio.create_task(producer(queue, items))]
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
await asyncio.gather(*producers)
await queue.join()
for c in consumers:
c.cancel()
Async HTTP Clients
aiohttp
import aiohttp
import asyncio
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
return await response.json()
async def fetch_many(urls: list) -> list:
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [f"https://api.example.com/users/{i}" for i in range(50)]
results = await fetch_many(urls)
print(f"Fetched {len(results)} users")
asyncio.run(main())
httpx (sync + async)
import httpx
import asyncio
async def fetch_with_httpx(urls: list) -> list:
async with httpx.AsyncClient(timeout=30, limits=httpx.Limits(max_connections=20)) as client:
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [r.json() for r in responses]
async def main():
urls = [f"https://api.example.com/items/{i}" for i in range(100)]
results = await fetch_with_httpx(urls)
print(f"Got {len(results)} items")
Retry with tenacity
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import httpx
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((httpx.HTTPStatusError, httpx.RequestError)),
)
async def fetch_with_retry(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url)
response.raise_for_status()
return response.json()
Websockets
Server
import asyncio
import websockets
connected = set()
async def handler(websocket):
connected.add(websocket)
try:
async for message in websocket:
# Broadcast to all connected clients
websockets.broadcast(connected, message)
finally:
connected.remove(websocket)
async def main():
async with websockets.serve(handler, "localhost", 8765):
await asyncio.Future() # Run forever
asyncio.run(main())
Client
import asyncio
import websockets
async def client():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as ws:
await ws.send("Hello, server!")
response = await ws.recv()
print(f"Received: {response}")
asyncio.run(client())
Mixing Sync and Async Code
asyncio.to_thread (Python 3.9+)
import asyncio
import time
def blocking_io(duration: float) -> str:
time.sleep(duration) # Blocking call
return f"Slept for {duration}s"
async def main():
# Run blocking function in a thread
result = await asyncio.to_thread(blocking_io, 2.0)
print(result)
asyncio.run(main())
run_in_executor
async def main():
loop = asyncio.get_running_loop()
# Use default thread pool
result = await loop.run_in_executor(None, blocking_io, 2.0)
# Use process pool for CPU-bound work
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_intensive_work, data)
Debugging Async Code
Enable debug mode
import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
async def main():
asyncio.get_running_loop().set_debug(True)
# Log slow callbacks (> 100ms)
asyncio.get_running_loop().slow_callback_duration = 0.1
await some_operation()
asyncio.run(main(), debug=True)
Common pitfalls
# PITFALL 1: Forgetting await — coroutine never runs
async def bad():
fetch_data("https://example.com") # Missing await!
# RuntimeWarning: coroutine 'fetch_data' was never awaited
# PITFALL 2: Blocking call in async code
async def bad():
time.sleep(5) # Blocks the entire event loop!
async def good():
await asyncio.sleep(5) # Non-blocking
# PITFALL 3: Creating coroutine without scheduling
async def bad():
coro = fetch_data("https://example.com")
# Never awaited, never scheduled
# PITFALL 4: Using requests (sync) in async code
import requests
async def bad():
response = requests.get("https://example.com") # Blocks event loop!
async def good():
async with httpx.AsyncClient() as client:
response = await client.get("https://example.com")
Best Practices
- Use
asyncio.run()as entry point — creates and closes the event loop properly - Use
asyncio.gather()for concurrent I/O — runs coroutines concurrently, waits for all - Limit concurrency with semaphores — prevent overwhelming external services
- Use
asyncio.TaskGroup(3.11+) — better error handling thangather - Never call blocking functions in async code — use
asyncio.to_thread()instead - Use async HTTP clients —
aiohttporhttpx, neverrequests - Set timeouts —
asyncio.wait_for()orasyncio.timeout()to prevent hangs - Handle
CancelledError— clean up resources when tasks are cancelled - Use
return_exceptions=Truewhen you want to handle errors per-task - Prefer
asyncio.Queueover threading.Queue — works with the event loop - Enable debug mode in development — catches missing awaits and slow callbacks
- Use type hints with
Coroutine— improve IDE support and catch type errors
Common Mistakes
- Forgetting
await— coroutine is created but never executed - Calling
time.sleep()instead ofawait asyncio.sleep()— blocks the event loop - Using
requestslibrary in async code — it blocks the event loop - Not limiting concurrency — thousands of simultaneous requests overwhelm servers
- Not handling
CancelledError— resources leak when tasks are cancelled - Mixing
asyncio.run()calls — only one event loop should run per process - Not setting timeouts — a slow response hangs the entire application
- Using
asyncio.get_event_loop()in modern code — useasyncio.get_running_loop()orasyncio.run() - Creating tasks without keeping references — garbage collector may cancel them
- Not using
async withfor resources — connections leak without proper cleanup
Frequently Asked Questions
When should I use asyncio vs threading vs multiprocessing?
Use asyncio for I/O-bound workloads (HTTP requests, database queries, file I/O) — it handles thousands of concurrent connections with a single thread. Use threading for I/O-bound code that uses blocking libraries (like requests). Use multiprocessing for CPU-bound work (data processing, computation) — asyncio and threading are limited by the GIL.
Can I use asyncio with Flask?
Flask is synchronous by default. For async support, use Flask 2.0+ with async def route handlers, or switch to an async framework like FastAPI, Quart, or Starlette. FastAPI is the most popular choice — it uses Starlette’s asyncio under the hood and supports async/await natively.
How do I test async code?
Use pytest-asyncio:
import pytest
@pytest.mark.asyncio
async def test_fetch_data():
result = await fetch_data("https://example.com")
assert "Data from" in result