Skip to content
SP StackPractices
intermediate Por StackPractices

Patrones de Procesamiento por Lotes

Diseña pipelines robustos de procesamiento por lotes para grandes datasets con retry, idempotencia y observabilidad.

Temas: data

Nota para desarrolladores hispanohablantes: Esta guía incluye ejemplos y convenciones de nomenclatura adaptadas a equipos que trabajan en español. Cuando existen diferencias significativas en terminología técnica entre el inglés y el español, se indican explícitamente para facilitar la comunicación en equipos multiculturales.

Visión General

El procesamiento por lotes es la columna vertebral de pipelines de datos, flujos de trabajo ETL y generación de reportes. A diferencia del procesamiento de streams, los trabajos por lotes procesan conjuntos de datos acotados en chunks, lo que los hace más simples de razonar pero requieren atención cuidadosa a la idempotencia, tolerancia a fallos y observabilidad.

Cuándo Usar

Usa este recurso cuando:

  • Procesas grandes datasets que no caben en memoria
  • Construyes pipelines ETL para data warehouses
  • Generas reportes o agregaciones nocturnas
  • Migras datos entre sistemas con ventanas de mantenimiento

Solución

Pipeline Resiliente de Procesamiento por Lotes (Python)

import logging
from typing import Callable, List, Iterator

class BatchProcessor:
    def __init__(self, batch_size: int = 1000, max_retries: int = 3):
        self.batch_size = batch_size
        self.max_retries = max_retries
        self.processed = 0
        self.failed = []

    def process(
        self,
        items: Iterator[dict],
        handler: Callable[[List[dict]], None]
    ) -> dict:
        batch = []
        for item in items:
            batch.append(item)
            if len(batch) >= self.batch_size:
                self._execute(batch, handler)
                batch = []

        if batch:
            self._execute(batch, handler)

        return {"processed": self.processed, "failed": len(self.failed)}

    def _execute(self, batch: List[dict], handler: Callable):
        for attempt in range(self.max_retries):
            try:
                handler(batch)
                self.processed += len(batch)
                return
            except Exception as e:
                logging.warning(f"Batch fallido (intento {attempt + 1}): {e}")
                if attempt == self.max_retries - 1:
                    self.failed.extend(batch)

Seguimiento Idempotente de Trabajos (SQL)

CREATE TABLE job_runs (
    job_id VARCHAR(64) PRIMARY KEY,
    started_at TIMESTAMP NOT NULL DEFAULT NOW(),
    completed_at TIMESTAMP,
    status VARCHAR(20) CHECK (status IN ('running', 'completed', 'failed')),
    checksum VARCHAR(64)
);

-- Antes de comenzar, verifica si ya está completado
SELECT * FROM job_runs WHERE job_id = 'daily_report_2025_01_15' AND status = 'completed';

Explicación

Un pipeline de producción por lotes necesita tres propiedades:

  1. Idempotencia: Ejecutar el mismo trabajo dos veces debe producir el mismo resultado. Usa IDs de trabajo y checksums para saltar trabajo ya procesado.
  2. Tolerancia a fallos: Fallos individuales de batch no deben crashear todo el trabajo. Implementa reintentos con backoff exponencial y una cola de mensajes fallidos.
  3. Observabilidad: Rastrea progreso, throughput y errores. Emite métricas para items procesados, latencia y tasas de fallo.

Estrategia de chunking: Ajusta el tamaño de batches para balancear uso de memoria y throughput. Demasiado pequeño = overhead; demasiado grande = riesgo de OOM.

Variantes

PatrónCaso de UsoCompromiso
Procesamiento por chunksArchivos grandes, límites de memoriaMás simple, mayor latencia
Workers paralelosTransformaciones CPU-boundComplejo, necesita coordinación
MapReduceAgregación distribuidaEscala horizontalmente
Change Data CaptureSincronización incrementalRequiere soporte de la fuente

Mejores Prácticas

  • Diseña para idempotencia: Cada trabajo debe ser seguro de reintentar
  • Registra todo: Inicio de trabajo, fin, y resultado de cada batch
  • Usa transacciones: Envuelve escrituras de batch en transacciones de base de datos
  • Monitorea profundidad de cola: Alerta cuando batches pendientes excedan umbrales
  • Implementa circuit breakers: Detén reintentos si el downstream está unhealthy

Errores Comunes

  1. No manejar fallos parciales: Un batch de 1000 donde 1 falla necesita reintento individual
  2. Ignorar límites de memoria: Cargar datasets enteros en RAM crashea el proceso
  3. Faltar checkpointing: Un trabajo de 6 horas que falla a las 5:55 debe reiniciar desde cero
  4. Pérdida silenciosa de datos: Errores logueados pero no visibles para operadores
  5. Sin estrategia de rollback: Trabajos fallidos dejan la base de datos en estado inconsistente

Preguntas Frecuentes

P: ¿Qué tan grande debería ser cada batch? R: Comienza con 100-1000 items. Haz benchmark con tus datos y restricciones de memoria.

P: ¿Debería usar una cola de trabajos como Celery o un cron job? R: Usa Celery/Redis para sistemas distribuidos y cron para pipelines simples de un solo nodo.

P: ¿Cómo manejo cambios de schema en medio del pipeline? R: Versiona tu lógica de trabajo y schemas de datos. Ejecuta versiones viejas y nuevas en paralelo durante la migración.