Make Concurrent HTTP Requests with Python and aiohttp
Fetch multiple APIs concurrently using asyncio and aiohttp. Covers connection pooling, rate limiting, retries, and batch processing.
Note: This guide follows English-language naming conventions and terminology standards common in international development teams. Examples use English identifiers and comments to maximize compatibility across codebases and tooling.
Overview
Making HTTP requests one at a time is slow when you need to fetch from multiple APIs or endpoints. asyncio with aiohttp lets you run many requests concurrently, reducing total time from the sum of all request times to the longest single request. This recipe covers concurrent fetching, connection pooling, rate limiting, retries, and batch processing.
When to Use
- You need to fetch data from multiple APIs or endpoints simultaneously
- You are building a web scraper that fetches many pages
- You need to call multiple microservices and aggregate results
- Sequential HTTP requests are too slow for your use case
Solution
Install aiohttp
pip install aiohttp
Basic concurrent requests
import asyncio
import aiohttp
async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
async def fetch_all(urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Usage
urls = [
"https://api.github.com/users/octocat",
"https://api.github.com/users/torvalds",
"https://api.github.com/users/gvanrossum",
]
results = asyncio.run(fetch_all(urls))
for r in results:
print(r["login"])
Connection pooling with ClientSession
import asyncio
import aiohttp
async def fetch_with_pool(urls: list[str]) -> list[dict]:
# Reuse a single session for all requests — connection pooling
connector = aiohttp.TCPConnector(
limit=100, # Max total connections
limit_per_host=10, # Max connections per host
ttl_dns_cache=300, # DNS cache TTL in seconds
)
timeout = aiohttp.ClientTimeout(total=30, connect=10)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
tasks = [fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
Rate limiting with a semaphore
import asyncio
import aiohttp
async def rate_limited_fetch(urls: list[str], max_concurrent: int = 10) -> list[dict]:
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_limited(session: aiohttp.ClientSession, url: str) -> dict:
async with semaphore:
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
async with aiohttp.ClientSession() as session:
tasks = [fetch_limited(session, url) for url in urls]
return await asyncio.gather(*tasks)
# Limit to 5 concurrent requests
results = asyncio.run(rate_limited_fetch(urls, max_concurrent=5))
Retries with exponential backoff
import asyncio
import aiohttp
import logging
logger = logging.getLogger(__name__)
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
max_retries: int = 3,
backoff_factor: float = 0.5
) -> dict:
for attempt in range(max_retries):
try:
async with session.get(url) as response:
if response.status == 429:
retry_after = int(response.headers.get("Retry-After", backoff_factor * (2 ** attempt)))
logger.warning(f"Rate limited, retrying in {retry_after}s")
await asyncio.sleep(retry_after)
continue
response.raise_for_status()
return await response.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == max_retries - 1:
raise
wait = backoff_factor * (2 ** attempt)
logger.warning(f"Attempt {attempt + 1} failed: {e}, retrying in {wait}s")
await asyncio.sleep(wait)
async def fetch_all_with_retry(urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_retry(session, url) for url in urls]
return await asyncio.gather(*tasks)
Batch processing with return_exceptions
import asyncio
import aiohttp
async def fetch_all_safe(urls: list[str]) -> list:
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
# return_exceptions=True prevents one failure from canceling all
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# Usage — handle partial failures
results = asyncio.run(fetch_all_safe(urls))
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"URL {i} failed: {result}")
else:
print(f"URL {i}: {result.get('login', 'unknown')}")
Processing results as they complete
import asyncio
import aiohttp
async def fetch_progressive(urls: list[str]) -> None:
async with aiohttp.ClientSession() as session:
tasks = {asyncio.create_task(fetch(session, url)): url for url in urls}
for completed in asyncio.as_completed(tasks):
url = tasks[completed]
try:
result = await completed
print(f"Done: {url} -> {result.get('login', 'unknown')}")
except Exception as e:
print(f"Failed: {url} -> {e}")
asyncio.run(fetch_progressive(urls))
POST requests with JSON body
import asyncio
import aiohttp
async def post_data(session: aiohttp.ClientSession, url: str, data: dict) -> dict:
async with session.post(url, json=data) as response:
response.raise_for_status()
return await response.json()
async def create_users(users: list[dict]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [post_data(session, "https://httpbin.org/post", user) for user in users]
return await asyncio.gather(*tasks)
users = [{"name": "Alice"}, {"name": "Bob"}, {"name": "Charlie"}]
results = asyncio.run(create_users(users))
Custom headers and authentication
import asyncio
import aiohttp
async def fetch_authenticated(urls: list[str], token: str) -> list[dict]:
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
"User-Agent": "MyApp/1.0",
}
async with aiohttp.ClientSession(headers=headers) as session:
tasks = [fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
Explanation
asyncio is Python’s async I/O framework. It runs tasks concurrently on a single thread using an event loop. aiohttp is an async HTTP client/server that integrates with asyncio.
Key concepts:
- ClientSession: The equivalent of
requests.Session. Reuses TCP connections across requests. Always use a single session for all requests in a workflow. - asyncio.gather: Runs multiple coroutines concurrently and returns results in order. If one fails, all fail unless
return_exceptions=True. - Semaphore: Limits concurrent operations. Use to avoid overwhelming the server or hitting rate limits.
- as_completed: Returns results as they finish, not in submission order. Useful for progress reporting.
- TCPConnector: Controls connection pooling.
limitsets max total connections,limit_per_hostsets max per host.
Variants
| Approach | Concurrency | Library | Use When |
|---|---|---|---|
| asyncio + aiohttp | Async | aiohttp | High concurrency, I/O bound |
| httpx async | Async | httpx | Need sync + async in one library |
| ThreadPoolExecutor | Threads | requests | Simple, blocking library |
| httpx sync | None | httpx | Simple, sequential |
Guidelines
- Always reuse a single
ClientSessionfor all requests. Creating a session per request defeats connection pooling. - Use a
Semaphoreto limit concurrency. Too many parallel requests can overwhelm the server or trigger rate limits. - Set timeouts with
ClientTimeout. Default has no total timeout — a hung request blocks forever. - Use
return_exceptions=Truewithgatherwhen partial failures are acceptable. - Implement retries with exponential backoff for transient failures (429, 500, timeouts).
- Use
as_completedwhen you need results as soon as they are available. - Close sessions properly with
async withcontext manager. - Set a reasonable
limit_per_hostto avoid overwhelming a single server.
Common Mistakes
- Creating a new
ClientSessionper request. This is slow and wastes connections. - Not setting a timeout. A hung request blocks the event loop indefinitely.
- Using
requestsinside async code.requestsis blocking and freezes the event loop. - Not limiting concurrency. Thousands of parallel requests can exhaust file descriptors or trigger bans.
- Forgetting
awaitonresponse.json()orresponse.text(). Returns a coroutine instead of data. - Not handling
return_exceptions=Trueresults. Exceptions are returned as values, not raised. - Using
asyncio.run()multiple times in the same script. Create one event loop.
Frequently Asked Questions
Can I use requests with asyncio?
No. requests is a synchronous library. Using it inside async code blocks the event loop. Use aiohttp or httpx with async support instead.
What is the difference between gather and as_completed?
gather runs all tasks and returns results in submission order. as_completed yields results as they finish, not in order. Use gather when you need all results together. Use as_completed for progress reporting or streaming results.
How many concurrent requests should I make?
It depends on the server. Start with 10-50 concurrent requests. Check the API’s rate limit documentation. Use a Semaphore to control the number. Monitor for 429 (Too Many Requests) responses.
How do I test async HTTP code?
Use aioresponses to mock aiohttp requests in tests. Write tests as async def and run with pytest-asyncio.