Skip to content
SP StackPractices
advanced Por StackPractices

Procesamiento de Streams — Pipelines de Datos Event-Driven con Kafka, Flink y Spark

Guía práctica sobre procesamiento de streams: elegir entre Kafka Streams, Flink y Spark Streaming, diseñar esquemas de eventos, manejar operaciones stateful, y construir pipelines exactly-once para datos en tiempo real.

Nota para desarrolladores hispanohablantes: Esta guía incluye ejemplos y convenciones de nomenclatura adaptadas a equipos que trabajan en español. Cuando existen diferencias significativas en terminología técnica entre el inglés y el español, se indican explícitamente para facilitar la comunicación en equipos multiculturales.

Descripción General

El procesamiento de streams ingiere, transforma y produce datos continuamente a medida que fluyen eventos a través del sistema. A diferencia del procesamiento por batch que procesa datos en chunks programados, el procesamiento de streams maneja cada evento a medida que llega, habilitando reacciones sub-segundo a condiciones cambiantes. Poderiza detección de fraude en tiempo real, recomendaciones en vivo, monitoreo de IoT y analítica operacional.

Esta guía cubre fundamentos de procesamiento de streams, selección de motor, operaciones stateful y patrones de producción con Kafka Streams, Apache Flink y Spark Streaming.

Cuándo Usar

  • Necesitas procesar eventos a medida que llegan, no en batches por hora o diario
  • Tu sistema debe reaccionar a condiciones en segundos (fraude, anomalías, alertas)
  • Necesitas unir múltiples streams de eventos en tiempo real (clicks + transacciones)
  • Estás construyendo un sistema event-sourced donde el log de eventos es la fuente de verdad
  • Necesitas mantener estado agregado que actualiza continuamente (contadores, ventanas)
  • Tu volumen de datos hace el procesamiento por batch demasiado lento o intensivo en recursos

Cuándo NO Usar

  • Tu caso de uso tolera minutos u horas de latencia — ETL por batch es más simple y barato
  • Tus transformaciones requieren acceso a datos históricos que no caben en memoria
  • Necesitas joins complejos multi-tabla a través de sistemas dispares — batch u OLAP es mejor
  • Tu equipo carece de experiencia operativa con procesadores de streams distribuidos
  • El ordenamiento de eventos es crítico pero tu fuente no lo garantiza (la mayoría de logs, algunas APIs)

Conceptos Clave

ConceptoDescripción
Stream de EventosSecuencia ordenada y append-only de eventos
Partición de StreamSubconjunto de eventos dentro de un stream, procesado en paralelo
OffsetLa posición de un evento dentro de una partición (como un cursor)
Grupo de ConsumidoresUn conjunto de consumidores que comparten asignación de particiones
Procesamiento StatefulOperaciones que mantienen y actualizan estado a través de eventos
WatermarkUn timestamp que indica cuándo todos los eventos hasta ese tiempo han sido vistos
CheckpointUn snapshot de estado y offsets para recuperación de fallos

Motores de Procesamiento de Streams

┌─────────────────────────────────────────────────────────────────┐
│                     Fuentes de Eventos                        │
│  (Kafka, Kinesis, Pulsar, RabbitMQ, Logs, APIs)               │
└─────────────────────────────┬───────────────────────────────────┘

          ┌───────────────────┼───────────────────┐
          │                   │                   │
    ┌─────▼─────┐      ┌─────▼─────┐      ┌─────▼─────┐
    │   Flink   │      │  Kafka    │      │   Spark   │
    │           │      │  Streams  │      │  Streaming│
    │ • Complejo│      │ • Simple  │      │ • Batch   │
    │   event   │      │   event   │      │   compat  │
    │   time    │      │   logic   │      │ • Micro-  │
    │   proc    │      │ • Embedded│      │   batches │
    │ • Stateful│      │ • Kafka   │      │ • SQL     │
    │ • Exactly-│      │   only    │      │   support │
    │   once    │      │ • Easy ops│      │           │
    └─────┬─────┘      └─────┬─────┘      └─────┬─────┘
          │                   │                   │
          └───────────────────┼───────────────────┘

                    ┌─────────▼─────────┐
                    │  Destinations     │
                    │  (Database, API,  │
                    │  Another Stream,  │
                    │  Alerting)        │
                    └───────────────────┘

Implementación de Procesamiento de Streams Paso a Paso

1. Diseña Esquemas de Eventos

Esquemas bien diseñados hacen el procesamiento de streams confiable y evolvable:

// Ejemplo: Esquema de evento compatible con CloudEvents
{
  "specversion": "1.0",
  "type": "order.placed",
  "source": "payment-service",
  "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "time": "2024-06-25T14:30:00Z",
  "datacontenttype": "application/json",
  "data": {
    "order_id": "ORD-12345",
    "customer_id": "CUST-987",
    "items": [
      {"sku": "SKU-001", "quantity": 2, "price": 29.99}
    ],
    "total": 59.98,
    "currency": "USD",
    "shipping_address": {
      "country": "ES",
      "zip": "28001"
    }
  },
  "traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"
}

Principios de diseño de esquemas:

PrincipioPor Qué Importa
Eventos inmutablesLos eventos representan algo que ocurrió; no cambian
Auto-descriptivoIncluye todo el contexto necesario para procesar (no requiere lookups)
Timestamps UTCEl tiempo del evento es crítico para windowing y ordenamiento
IDs únicosHabilita idempotencia y deduplicación
Correlation IDsTraza eventos a través de múltiples etapas de procesamiento
Schema registryEnforce compatibilidad (Avro, Protobuf, JSON Schema)
// Ejemplo: Validación de esquema con JSON Schema
from jsonschema import validate, ValidationError
import json

ORDER_PLACED_SCHEMA = {
    "type": "object",
    "required": ["specversion", "type", "source", "id", "time", "data"],
    "properties": {
        "specversion": {"type": "string", "enum": ["1.0"]},
        "type": {"type": "string"},
        "source": {"type": "string"},
        "id": {"type": "string", "format": "uuid"},
        "time": {"type": "string", "format": "date-time"},
        "data": {
            "type": "object",
            "required": ["order_id", "customer_id", "total"],
            "properties": {
                "order_id": {"type": "string"},
                "customer_id": {"type": "string"},
                "total": {"type": "number", "minimum": 0},
                "currency": {"type": "string", "enum": ["USD", "EUR", "GBP"]}
            }
        }
    }
}

def validate_event(event):
    try:
        validate(instance=event, schema=ORDER_PLACED_SCHEMA)
        return True, None
    except ValidationError as e:
        return False, str(e)

2. Implementa Kafka Streams

Kafka Streams es la opción más simple para arquitecturas centradas en Kafka:

// Ejemplo: Aplicación Kafka Streams para analítica de pedidos
public class OrderAnalyticsApp {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
            Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
            Serdes.String().getClass().getName());
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Leer eventos de pedidos
        KStream<String, Order> orders = builder.stream("orders",
            Consumed.with(Serdes.String(), new OrderSerde()));
        
        // Filtrar pedidos de alto valor
        KStream<String, Order> highValueOrders = orders
            .filter((key, order) -> order.getTotal() > 100.0);
        
        // Enriquecer con datos de cliente (KTable lookup)
        KTable<String, Customer> customers = builder.table("customers",
            Consumed.with(Serdes.String(), new CustomerSerde()));
        
        KStream<String, EnrichedOrder> enrichedOrders = highValueOrders
            .leftJoin(customers, (order, customer) -> new EnrichedOrder(order, customer));
        
        // Escribir a tópico de salida
        enrichedOrders.to("enriched-orders",
            Produced.with(Serdes.String(), new EnrichedOrderSerde()));
        
        // Ventana tumbling: revenue por categoría por hora
        orders
            .groupBy((key, order) -> order.getCategory())
            .windowedBy(TimeWindows.of(Duration.ofHours(1)))
            .aggregate(
                () -> 0.0,
                (key, order, total) -> total + order.getTotal(),
                Materialized.with(Serdes.String(), Serdes.Double())
            )
            .toStream()
            .to("hourly-revenue-by-category");
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Patrones de Kafka Streams:

PatrónDescripciónCaso de Uso
KStream → KStreamTransformación evento-a-eventoFiltrado, mapeo, branching
KStream → KTableAgregación en estadoConteo, suma, agrupación
KTable → KStreamSalida de changelogPublicar agregados actualizados
KStream + KTableEnriquecimiento de streamLookup joins con datos de referencia
KStream + KStreamJoin de streamsCoincidir eventos de dos streams
Windowed aggregationAgrupación con límites de tiempoMétricas por hora, análisis de sesión

Flink es el motor más capaz para procesamiento de streams complejo:

// Ejemplo: Job de Flink para detección de fraude en tiempo real
public class FraudDetectionJob {
    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = 
            StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Configurar checkpointing para exactly-once
        env.enableCheckpointing(60000);  // Checkpoints de 60 segundos
        env.getCheckpointConfig().setCheckpointingMode(
            CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckups(30000);
        
        // Fuente Kafka con watermarking de event-time
        KafkaSource<Transaction> source = KafkaSource.<Transaction>builder()
            .setBootstrapServers("kafka:9092")
            .setTopics("transactions")
            .setGroupId("fraud-detection")
            .setStartingOffsets(OffsetsInitializer.latest())
            .setValueOnlyDeserializer(new TransactionDeserializationSchema())
            .build();
        
        DataStream<Transaction> transactions = env.fromSource(
            source,
            WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(
                Duration.ofSeconds(30))
                .withTimestampAssigner((transaction, timestamp) -> 
                    transaction.getTimestamp()),
            "Transactions"
        );
        
        // Stream keyed por cuenta para estado por cuenta
        DataStream<Alert> alerts = transactions
            .keyBy(Transaction::getAccountId)
            .process(new FraudDetectionFunction());
        
        // Sink de alertas a Kafka
        KafkaSink<Alert> sink = KafkaSink.<Alert>builder()
            .setBootstrapServers("kafka:9092")
            .setRecordSerializer(
                KafkaRecordSerializationSchema.builder()
                    .setTopic("fraud-alerts")
                    .setValueSerializationSchema(new AlertSerializationSchema())
                    .build()
            )
            .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
            .build();
        
        alerts.sinkTo(sink);
        env.execute("Fraud Detection");
    }
    
    // Función de detección de fraude con estado
    public static class FraudDetectionFunction 
            extends KeyedProcessFunction<String, Transaction, Alert> {
        
        private ValueState<Double> lastAmountState;
        private ValueState<Long> lastTimestampState;
        private ListState<Transaction> recentTransactionsState;
        
        @Override
        public void open(Configuration parameters) {
            lastAmountState = getRuntimeContext().getState(
                new ValueStateDescriptor<>("lastAmount", Types.DOUBLE));
            lastTimestampState = getRuntimeContext().getState(
                new ValueStateDescriptor<>("lastTimestamp", Types.LONG));
            recentTransactionsState = getRuntimeContext().getListState(
                new ListStateDescriptor<>("recentTxns", Transaction.class));
        }
        
        @Override
        public void processElement(Transaction txn, Context ctx, Collector<Alert> out) 
                throws Exception {
            
            Double lastAmount = lastAmountState.value();
            Long lastTimestamp = lastTimestampState.value();
            
            // Regla 1: Pico de monto (>3x anterior)
            if (lastAmount != null && txn.getAmount() > lastAmount * 3) {
                out.collect(new Alert(txn.getAccountId(), "AMOUNT_SPIKE", 
                    txn.getAmount(), txn.getTimestamp()));
            }
            
            // Regla 2: Velocidad (3+ transacciones en 1 minuto)
            recentTransactionsState.add(txn);
            long oneMinuteAgo = txn.getTimestamp() - 60000;
            
            Iterable<Transaction> recent = recentTransactionsState.get();
            int count = 0;
            for (Transaction t : recent) {
                if (t.getTimestamp() > oneMinuteAgo) count++;
            }
            
            if (count >= 3) {
                out.collect(new Alert(txn.getAccountId(), "VELOCITY", 
                    txn.getAmount(), txn.getTimestamp()));
            }
            
            // Actualizar estado
            lastAmountState.update(txn.getAmount());
            lastTimestampState.update(txn.getTimestamp());
        }
    }
}

Patrones de Flink:

PatrónAPICaso de Uso
Agregación con ventanakeyBy(...).window(...).aggregate(...)Métricas por hora, resúmenes diarios
Ventanas de event-timeWatermarkStrategy.forBoundedOutOfOrderness(...)Resultados correctos a pesar de eventos tardíos
Operadores statefulValueState, ListState, MapStateTracking de sesión, detección de fraude
Async I/OAsyncDataStream.unorderedWait(...)Enriquecimiento con llamadas a API externas
CEP (Complex Event Processing)CEP.pattern(...)Matching de patrones multi-evento
Side outputsOutputTag + ctx.output(...)Dead letter queue, manejo de datos tardíos

4. Maneja Estado y Tolerancia a Fallos

El estado es la parte más difícil del procesamiento de streams:

// Ejemplo: Configuración de backend de estado de Flink
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Opciones: MemoryStateBackend, FsStateBackend, RocksDBStateBackend
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:8020/flink/checkpoints");

// TTL (time-to-live) de estado para garbage collection
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.hours(24))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .cleanupFullSnapshot()
    .build();

ValueStateDescriptor<MyState> descriptor = new ValueStateDescriptor<>("myState", MyState.class);
descriptor.enableTimeToLive(ttlConfig);

Estrategias de manejo de estado:

EstrategiaMejor ParaTrade-off
En memoria (HashMap)Estado pequeño, ventanas cortasRápido, pero se pierde en fallo
RocksDBEstado grande, ventanas largasMás lento, pero escala a TBs
Almacenamiento externoEstado compartido entre jobsAñade latencia y complejidad
Checkpoints incrementalesEstado grande que cambia lentamenteReduce tiempo de checkpoint

5. Implementa Procesamiento Exactly-Once

La semántica exactly-once asegura que cada evento se procesa una vez a pesar de fallos:

// Configuración Kafka + Flink exactly-once
Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("group.id", "exactly-once-job");

// Configuración de producer para escrituras idempotentes
props.put("enable.idempotence", "true");
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", "5");

// Configuración exactly-once de Flink
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// Sink con escrituras transaccionales
FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
    "output-topic",
    new SimpleStringSchema(),
    props,
    FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);

Garantías de entrega:

GarantíaComportamientoCaso de Uso
At-most-onceLos eventos pueden perderseMétricas no críticas, logs
At-least-onceLos eventos pueden duplicarseLa mayoría de analítica, conteo
Exactly-onceSin pérdida, sin duplicadosTransacciones financieras, facturación

Mejores Prácticas

  • Usa event time, no processing time. El processing time es no confiable a través de reinicios y replays. Event time con watermarks da resultados correctos.
  • Mantén estado acotado. Usa TTL, expiración de ventana y limpieza periódica para prevenir crecimiento ilimitado de estado.
  • Sinks idempotentes. Incluso con exactly-once, diseña tus consumidores downstream para manejar duplicados elegantemente.
  • Monitorea lag. El consumer lag es la métrica operacional primaria para salud de procesamiento de streams.
  • Prueba con replay. Reproduce eventos históricos a través de tu job para validar corrección y rendimiento.
  • Evolución de esquema. Usa Confluent Schema Registry o similar para enforce compatibilidad backward/forward.

Errores Comunes

  • Ventanas de processing time. Los resultados difieren en cada replay. Siempre usa event time para agregaciones.
  • Crecimiento ilimitado de estado. Olvidar configurar TTL en estado lleva a crashes OOM después de días o semanas.
  • Ignorar backpressure. Cuando los consumidores no pueden mantenerse al día, ocurre pérdida de datos o fallos en cascada. Monitorea y escala.
  • Sin dead letter queue. Eventos inválidos no deberían crashear el pipeline. Róutalos a un DLQ para inspección.
  • Operaciones stateful sin checkpoints. Un reinicio del job pierde todo el estado y debe reprocesar desde el principio.
  • Kafka auto.offset.reset=latest. Esto silenciosamente saltea datos en nuevos grupos de consumidores. Usa earliest u offsets explícitos.

Variantes

  • Kafka Streams: Librería embebida, no requiere cluster separado — mejor para transformaciones simples sobre Kafka
  • Flink: Procesador de streams full-featured — mejor para procesamiento complejo de event time y operaciones stateful
  • Spark Streaming: Procesamiento micro-batch — mejor para equipos ya usando Spark, o cuando unificación batch+streaming importa
  • ksqlDB: Interfaz SQL sobre Kafka Streams — mejor para procesamiento declarativo de streams sin Java
  • Pulsar Functions: Computación liviana sobre Apache Pulsar — mejor para arquitecturas centradas en Pulsar

FAQ

P: ¿Debería usar Kafka Streams o Flink? Usa Kafka Streams si tu lógica es simple (filter, map, aggregate) y ya eres Kafka-centric. Usa Flink para windowing complejo, semántica de event time, CEP, o cuando necesitas procesar desde múltiples fuentes más allá de Kafka.

P: ¿Cómo manejo eventos que llegan tarde? Usa watermarks con lateness permitida. Flink soporta allowedLateness() en ventanas. Kafka Streams soporta períodos de gracia. Eventos que llegan después del período de gracia van a un side output o son descartados.

P: ¿Cuál es la diferencia entre procesamiento de streams y analítica en tiempo real? El procesamiento de streams es el motor que transforma eventos. La analítica en tiempo real es el sistema end-to-end que incluye colección, procesamiento, almacenamiento y visualización. El procesamiento de streams alimenta a la analítica en tiempo real.

P: ¿Puedo usar SQL para procesamiento de streams? Sí. Flink SQL, ksqlDB y Spark Structured Streaming soportan SQL sobre streams. Estos son excelentes para agregaciones simples y joins. Lógica stateful compleja aún requiere la API DataStream/funcional.

Conclusión

El procesamiento de streams habilita sistemas que reaccionan a eventos a medida que ocurren. Al elegir el motor correcto, diseñar esquemas de eventos inmutables, manejar estado cuidadosamente e implementar semántica exactly-once, construyes pipelines que procesan millones de eventos por segundo con garantías de corrección.

Recursos Relacionados