Usar Estructuras de Datos Concurrentes para Colecciones Thread-Safe
Cómo compartir colecciones entre threads de forma segura usando blocking queues, concurrent maps, copy-on-write lists y atomic counters en Java, Python y C++.
Visión general
Compartir un ArrayList estándar entre threads es peligroso. El thread A lee el índice 0 mientras el thread B elimina el índice 0 — ConcurrentModificationException. El thread A y B llaman map.put("key", value) simultáneamente en un HashMap — la lista enlazada interna puede volverse circular, causando un loop infinito durante la iteración. Estas fallas son no deterministas: pueden pasar miles de tests y fallar solo bajo carga de producción.
Las colecciones estándar (ArrayList, HashMap, LinkedList) no son thread-safe. Envolver cada acceso en synchronized funciona pero serializa todas las operaciones, derrotando el paralelismo. Las estructuras de datos concurrentes son colecciones diseñadas para acceso multi-thread: usan locks de grano fino, algoritmos lock-free o inmutabilidad para permitir lecturas y escrituras concurrentes seguras con mínima contención. Esta receta cubre blocking queues, concurrent maps, copy-on-write collections y atomic counters con ejemplos prácticos.
Cuándo usarlo
Usa esta receta cuando:
- Múltiples threads leen y escriben la misma colección
- Implementando patrones productor-consumidor con backpressure
- Construyendo caches, colas de trabajo o pools de conexiones compartidos por thread pools
- Reemplazando
synchronized(list)oCollections.synchronizedMap()con alternativas de mayor rendimiento - Asegurando visibilidad de escrituras entre threads sin barreras de memoria explícitas
Solución
Blocking Queue (Java)
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class OrderProcessor {
private final BlockingQueue<Order> queue = new ArrayBlockingQueue<>(100);
public void submit(Order order) throws InterruptedException {
queue.put(order); // bloquea si la cola está llena
}
public Order take() throws InterruptedException {
return queue.take(); // bloquea si la cola está vacía
}
}
// Productor
Thread producer = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
processor.submit(new Order(i));
}
});
// Pool de consumidores
for (int i = 0; i < 4; i++) {
new Thread(() -> {
while (true) {
try {
Order order = processor.take();
process(order);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}).start();
}
Concurrent Map (Java)
import java.util.concurrent.ConcurrentHashMap;
class InMemoryCache {
private final ConcurrentHashMap<String, CachedValue> cache = new ConcurrentHashMap<>();
public String get(String key, Supplier<String> loader) {
return cache.computeIfAbsent(key, k -> {
String value = loader.get();
return new CachedValue(value, System.currentTimeMillis());
}).value;
}
public void invalidate(String key) {
cache.remove(key);
}
private record CachedValue(String value, long timestamp) {}
}
Python Queue (Thread-Safe)
from queue import Queue
from threading import Thread
class TaskQueue:
def __init__(self, maxsize=100):
self.queue = Queue(maxsize=maxsize)
def submit(self, task):
self.queue.put(task) # bloquea si está llena
def worker(self):
while True:
task = self.queue.get() # bloquea si está vacía
if task is None:
break
self.process(task)
self.queue.task_done()
tq = TaskQueue()
Thread(target=lambda: [tq.submit(i) for i in range(1000)]).start()
for _ in range(4):
Thread(target=tq.worker).start()
Copy-on-Write List (Java)
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
class EventDispatcher {
private final CopyOnWriteArrayList<Consumer<Event>> listeners = new CopyOnWriteArrayList<>();
public void addListener(Consumer<Event> listener) {
listeners.add(listener);
}
public void removeListener(Consumer<Event> listener) {
listeners.remove(listener);
}
public void dispatch(Event event) {
for (Consumer<Event> listener : listeners) {
listener.accept(event);
}
}
}
Explicación
- BlockingQueue: una cola que bloquea productores cuando está llena y consumidores cuando está vacía. Esto provee backpressure natural — un productor rápido no puede abrumar a un consumidor lento.
ArrayBlockingQueueusa un solo lock;LinkedBlockingQueueusa locks separados para head y tail, permitiendo mayor concurrencia para cargas mixtas de lectura/escritura. - ConcurrentHashMap: a diferencia de
Collections.synchronizedMap(), que lockea todo el mapa para cada operación,ConcurrentHashMapusa lock striping — segmentando el mapa en regiones lockeables independientemente. Las lecturas suelen ser lock-free.computeIfAbsentchequea e inserta atómicamente, previniendo la carrera clásica de doble carga en caches. - CopyOnWriteArrayList: cada escritura crea una copia completa del array subyacente. Las lecturas son lock-free y rápidas. Las escrituras son costosas, así que esto es ideal para colecciones con pocas escrituras y muchas lecturas — como listas de listeners de eventos. Un iterador sobre copy-on-write ve un snapshot del momento de creación del iterador.
- AtomicInteger / AtomicLong: no son colecciones, pero son los bloques de construcción de contadores concurrentes, generadores de secuencia y estadísticas.
incrementAndGet()usa una instrucciónCASde CPU, haciéndola lock-free y típicamente más rápida quesynchronizedpara contadores simples.
Variantes
| Estructura | Lecturas | Escrituras | Mejor para | Overhead |
|---|---|---|---|---|
| BlockingQueue | Bloqueante | Bloqueante | Productor-consumidor con backpressure | Lock por op |
| ConcurrentHashMap | Lock-free | Lock striping | Caches de alta concurrencia | Bajo |
| CopyOnWriteArrayList | Lock-free | Copia completa | Pocas escrituras, muchas lecturas | Alta escritura |
| ConcurrentLinkedQueue | Lock-free | Lock-free | Colas de alto throughput | Bajo |
| SynchronizedMap | Lockeada | Lockeada | Migración simple | Alta |
Mejores prácticas
- Prefiere
ConcurrentHashMapsobreCollections.synchronizedMap(): los wrappers sincronizados lockean todo el mapa para cada operación, incluyendoget().ConcurrentHashMappermite lecturas concurrentes y locks más finos para escritura. La diferencia de rendimiento es dramática bajo contención de threads. - Usa
computeIfAbsentpara inicialización perezosa de cache:if (!map.containsKey(key)) map.put(key, load())es una condición de carrera. Dos threads pueden cargar y poner.map.computeIfAbsent(key, k -> load())chequea e inserta atómicamente, asegurando que el loader corre como máximo una vez por clave. - Colas con tamaño limitado para backpressure: una
LinkedBlockingQueueilimitada puede crecer hasta que la JVM se quede sin memoria bajo un productor rápido. Siempre establece un tamaño máximo y usaput()(bloqueante) en lugar deoffer()(no bloqueante) cuando quieres aplicar backpressure. - Copy-on-write para listas de listeners: si tu aplicación registra listeners de eventos al arrancar y raramente los cambia,
CopyOnWriteArrayListda lecturas lock-free. No lo uses para listas frecuentemente actualizadas — el costo de copia por escritura se vuelve prohibitivo. - Itera con
Iterator, nofor-eachen colecciones sincronizadas:for (Item item : synchronizedList)no es atómico. Otro thread puede modificar la lista entre pasos del iterador, lanzandoConcurrentModificationException. Usa bloquessynchronized(list) { ... }explícitos alrededor de la iteración, o usa colecciones concurrentes.
Errores comunes
- Usar
size()para decisiones de cola: chequearif (queue.size() > 0) queue.take()es una condición de carrera. La cola puede quedar vacía entre el chequeo desize()y la llamada atake(). Usa métodos bloqueantes (take(),put()) o no bloqueantes (poll(),offer()) directamente sin prechequeos. - Modificar una colección mientras iteras: incluso
ConcurrentHashMapno soporta modificar el mapa vía el valor retornado poriterator(). UsaIterator.remove()u operaciones bulk (removeIf,replaceAll) en lugar de mutar dentro de un loopfor. - Esperar ordenamiento de
ConcurrentHashMap:ConcurrentHashMapno garantiza orden de iteración. Si necesitas acceso concurrente ordenado, usaConcurrentSkipListMap, que provee ordenamiento tipoTreeMapcon lecturas lock-free. - Olvidar
task_done()enQueuede Python:queue.task_done()debe llamarse después de procesar cada ítem para señalar completitud aqueue.join(). Llamadas faltantes causan quejoin()se cuelgue indefinidamente, esperando tareas que ya fueron procesadas.
Preguntas frecuentes
P: ¿Debería siempre usar colecciones concurrentes en código multithread? R: Si la colección es compartida, sí. Si cada thread tiene su propia colección (ej. un buffer local que se mergea al final), las colecciones estándar son más rápidas y simples. Las colecciones concurrentes tienen overhead que no necesitas para datos thread-local.
P: ¿Es ConcurrentHashMap completamente thread-safe?
R: Las operaciones individuales (get, put, computeIfAbsent) son thread-safe. Las operaciones compuestas (if (!map.containsKey(k)) map.put(k, v)) no lo son. Usa computeIfAbsent, merge, o compute para operaciones compuestas atómicas.
P: ¿Cuándo debería usar CopyOnWriteArrayList vs Collections.synchronizedList?
R: Usa CopyOnWriteArrayList cuando las escrituras son raras (ej. listeners configurados al arrancar) y las lecturas frecuentes. Usa Collections.synchronizedList cuando las escrituras son frecuentes y las lecturas ocasionales — aunque ConcurrentLinkedQueue suele ser mejor que ambos para patrones de acceso tipo cola.
P: ¿Puedo usar colecciones concurrentes desde código async/await?
R: Las colecciones concurrentes de Java funcionan bien con virtual threads y CompletableFuture. En Python, asyncio tiene su propia asyncio.Queue — mezclar threading.Queue con asyncio requiere bridging entre contextos de thread y event loop usando loop.call_soon_threadsafe().
Recursos Relacionados
Coordinate Shared Access with Locks, Mutexes, and Semaphores
How to prevent race conditions in concurrent programs using mutexes, read-write locks, semaphores, and atomic operations in Java, Python, and C++.
RecipeManage Concurrent Work with Thread Pools and Executors
How to efficiently manage worker threads using thread pools, executors, and rejection policies in Java, Python, and C# for CPU-bound and I/O-bound workloads.
RecipeMaster Async Patterns with Promises, Futures, and Coroutines
How to write efficient concurrent code using async/await, promises, futures, and coroutines in JavaScript, Python, and Java for non-blocking I/O and parallel processing.
RecipeDesign Resilient Microservices with Circuit Breakers, Retries, and Timeouts
How to build fault-tolerant distributed systems using microservices patterns including circuit breakers, bulkheads, retries with backoff, and sagas for transaction management.