Skip to content
SP StackPractices
advanced

Concurrency Patterns Guide

A guide to common concurrency patterns and best practices for writing safe, efficient concurrent code.

Introduction

Concurrency enables programs to handle multiple tasks simultaneously. Used correctly, it improves throughput and responsiveness. Used incorrectly, it introduces race conditions, deadlocks, and subtle bugs that are hard to reproduce.

When to Use Concurrency

Use CaseApproach
I/O-bound tasks (HTTP calls, DB queries)Async/await, coroutines
CPU-bound tasks (data processing, ML)Thread pools, multiprocessing
Real-time streamingEvent loops, reactive streams
Background jobsTask queues (Celery, Bull, Sidekiq)

The Thread Pool Pattern

Instead of creating threads per task, reuse a fixed pool.

Python

from concurrent.futures import ThreadPoolExecutor
import requests

def fetch(url):
    return requests.get(url, timeout=10).status_code

urls = ["https://api.example.com/1", "https://api.example.com/2"]

with ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(fetch, urls))

Java

ExecutorService executor = Executors.newFixedThreadPool(4);
List<Future<Integer>> futures = urls.stream()
    .map(url -> executor.submit(() -> fetch(url)))
    .toList();

for (Future<Integer> f : futures) {
    System.out.println(f.get());
}
executor.shutdown();

Rule of thumb: Pool size ~ number of CPU cores for CPU-bound tasks, higher for I/O-bound.

Async/Await Pattern

Non-blocking I/O without threads.

JavaScript (Node.js)

async function fetchAll(urls) {
  const promises = urls.map(url => fetch(url));
  const responses = await Promise.all(promises);
  return responses.map(r => r.status);
}

Python

import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as response:
        return response.status

async def fetch_all(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        return await asyncio.gather(*tasks)

Producer-Consumer Pattern

Decouple work generation from work processing.

import asyncio
from asyncio import Queue

async def producer(queue: Queue, items: list):
    for item in items:
        await queue.put(item)
        print(f"Produced: {item}")
    await queue.put(None)

async def consumer(queue: Queue, worker_id: int):
    while True:
        item = await queue.get()
        if item is None:
            queue.put_nowait(None)
            break
        print(f"Consumer {worker_id} processing: {item}")
        await asyncio.sleep(0.1)
        queue.task_done()

# Usage
queue = asyncio.Queue(maxsize=10)
items = list(range(20))

await asyncio.gather(
    producer(queue, items),
    consumer(queue, 1),
    consumer(queue, 2),
)

Semaphore for Rate Limiting

Control access to limited resources.

import asyncio

class RateLimitedClient:
    def __init__(self, max_concurrent: int = 5):
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def request(self, url: str):
        async with self.semaphore:
            return await fetch(url)

Avoiding Race Conditions

Immutable Data

The best synchronization is no synchronization.

from dataclasses import dataclass

@dataclass(frozen=True)
class Point:
    x: float
    y: float

# frozen=True makes instances immutable and thread-safe

Atomic Operations

import threading

class SafeCounter:
    def __init__(self):
        self._value = 0
        self._lock = threading.Lock()

    def increment(self):
        with self._lock:
            self._value += 1

Read-Write Lock

import java.util.concurrent.locks.ReentrantReadWriteLock;

class CachedData {
    private String data;
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    String read() {
        rwl.readLock().lock();
        try { return data; }
        finally { rwl.readLock().unlock(); }
    }

    void write(String newData) {
        rwl.writeLock().lock();
        try { this.data = newData; }
        finally { rwl.writeLock().unlock(); }
    }
}

Common Pitfalls

ProblemSymptomSolution
Race conditionIntermittent wrong resultsLocks, atomic operations, or immutability
DeadlockThreads freeze waiting for each otherConsistent lock ordering, timeouts
StarvationSome threads never executeFair locks, priority queues
Thread leakMemory grows over timeUse thread pools, always shutdown
Context switchingHigh CPU, low throughputReduce thread count, use async I/O

Best Practices

  • Share nothing: Prefer message passing over shared state
  • Use thread-safe collections: ConcurrentHashMap, Queue, AtomicInteger
  • Keep critical sections small: Hold locks for the minimum time
  • Never call external APIs while holding a lock
  • Test with ThreadSanitizer or Helgrind for race detection

Language Quick Reference

LanguageThreadingAsync I/ONotable APIs
Pythonthreading, multiprocessingasyncio, aiohttpThreadPoolExecutor, Semaphore
JavaScriptWeb WorkersPromise, async/awaitPromise.all, Atomics
Javajava.util.concurrentCompletableFutureExecutorService, CountDownLatch
GoGoroutinesBuilt-in channelssync.WaitGroup, select
Ruststd::threadtokioArc<Mutex<T>>, mpsc

Frequently Asked Questions

When should I use async/await vs threads?

Use async/await for I/O-bound tasks (HTTP calls, file system, databases). Use threads or processes for CPU-bound work (calculations, data processing) that needs parallel execution.

How do I avoid deadlocks?

Always acquire locks in the same order across your codebase. Use timeouts on lock acquisition. Prefer lock-free data structures when possible. The simplest fix is often to reduce shared state.

What is the difference between concurrency and parallelism?

Concurrency is about structuring a program to handle multiple tasks (interleaving). Parallelism is about executing multiple tasks simultaneously (truly at the same time). Async I/O is concurrent; multithreading on multiple cores is parallel.