Skip to content
SP StackPractices
advanced Por StackPractices

Guía Completa de Kafka Stream Processing

Construye pipelines de event streaming en tiempo real con Kafka. Cubre producers, consumers, Kafka Streams, Kafka Connect, schema registry y patrones de procesamiento.

Nota para desarrolladores hispanohablantes: Esta guía incluye ejemplos y convenciones de nomenclatura adaptadas a equipos que trabajan en español. Cuando existen diferencias significativas en terminología técnica entre el inglés y el español, se indican explícitamente para facilitar la comunicación en equipos multiculturales.

Guía Completa de Kafka Stream Processing

Introducción

Apache Kafka es una plataforma distribuida de event streaming. Maneja trillones de eventos por día en empresas como LinkedIn, Uber y Netflix. Esta guía cubre conceptos core de Kafka, producers, consumers, Kafka Streams API, Kafka Connect, Schema Registry y patrones comunes de stream processing con ejemplos en Python, Java y JavaScript.

Conceptos Core

  • Topic: Un stream nombrado de eventos (como una categoría)
  • Partition: Un topic se divide en partitions para paralelismo; cada partition es un log ordenado y append-only
  • Offset: La posición de un mensaje dentro de una partition
  • Consumer Group: Un grupo de consumers que comparten partitions de un topic
  • Broker: Un servidor Kafka que almacena y sirve mensajes
  • Producer: Una aplicación que publica eventos a topics
  • Consumer: Una aplicación que se suscribe a topics y procesa eventos

Producer

Python (aiokafka)

from aiokafka import AIOKafkaProducer
import json
import asyncio

async def produce_events():
    producer = AIOKafkaProducer(
        bootstrap_servers="localhost:9092",
        value_serializer=lambda v: json.dumps(v).encode("utf-8"),
        key_serializer=lambda k: k.encode("utf-8") if k else None,
        acks="all",
        enable_idempotence=True,
    )
    await producer.start()
    try:
        for i in range(100):
            await producer.send_and_wait(
                "orders",
                key=f"order-{i}",
                value={"order_id": i, "amount": 99.99 * i, "status": "created"},
            )
    finally:
        await producer.stop()

asyncio.run(produce_events())

Java (Kafka Producer)

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class OrderProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks", "all");
        props.put("enable.idempotence", "true");

        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            for (int i = 0; i < 100; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>(
                    "orders",
                    "order-" + i,
                    "{\"order_id\":" + i + ",\"amount\":" + (99.99 * i) + "}"
                );
                producer.send(record, (metadata, e) -> {
                    if (e != null) {
                        e.printStackTrace();
                    } else {
                        System.out.printf("Sent to partition %d offset %d%n",
                            metadata.partition(), metadata.offset());
                    }
                });
            }
        }
    }
}

Consumer

Python (aiokafka)

from aiokafka import AIOKafkaConsumer
import json
import asyncio

async def consume_events():
    consumer = AIOKafkaConsumer(
        "orders",
        bootstrap_servers="localhost:9092",
        group_id="order-processor",
        value_deserializer=lambda v: json.loads(v.decode("utf-8")),
        key_deserializer=lambda k: k.decode("utf-8") if k else None,
        auto_offset_reset="earliest",
        enable_auto_commit=False,
    )
    await consumer.start()
    try:
        async for msg in consumer:
            print(f"Topic={msg.topic}, Partition={msg.partition}, Offset={msg.offset}")
            print(f"Key={msg.key}, Value={msg.value}")
            await consumer.commit()
    finally:
        await consumer.stop()

asyncio.run(consume_events())

JavaScript (KafkaJS)

const { Kafka } = require("kafkajs");

const kafka = new Kafka({
    clientId: "order-consumer",
    brokers: ["localhost:9092"],
});

const consumer = kafka.consumer({ groupId: "order-processor" });

async function consumeEvents() {
    await consumer.connect();
    await consumer.subscribe({ topic: "orders", fromBeginning: true });

    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            const event = JSON.parse(message.value.toString());
            console.log(`Processing order ${event.order_id}, amount: ${event.amount}`);

            // Procesar la orden...
        },
    });
}

consumeEvents();

Kafka Streams (Java)

Kafka Streams es una librería para construir aplicaciones de stream processing en tiempo real.

Filter y branch

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;

public class OrderStreamProcessor {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("application.id", "order-processor");
        props.put("bootstrap.servers", "localhost:9092");
        props.put("default.key.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");
        props.put("default.value.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> orders = builder.stream("orders");

        // Branch por amount
        KStream<String, String>[] branches = orders.branch(
            (key, value) -> value.contains("\"amount\":0"),
            (key, value) -> true
        );

        KStream<String, String> zeroOrders = branches[0];
        KStream<String, String> validOrders = branches[1];

        // Filter órdenes high-value
        KStream<String, String> highValue = validOrders.filter(
            (key, value) -> {
                // Parsear y chequear amount > 1000
                return value.contains("\"amount\":1");
            }
        );

        highValue.to("high-value-orders");
        validOrders.to("processed-orders");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

Aggregation (count por key)

KTable<String, Long> orderCounts = orders
    .groupBy((key, value) -> key, Grouped.with(
        Serdes.String(),
        Serdes.String()
    ))
    .count();

orderCounts.toStream().to("order-counts");

Windowed aggregation (ventanas tumbling de 5 minutos)

KTable<Windowed<String>, Long> windowedCounts = orders
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count();

windowedCounts.toStream((key, value) -> key.key() + "@" + key.window().start())
    .to("windowed-order-counts");

Joining streams

KStream<String, String> orders = builder.stream("orders");
KStream<String, String> payments = builder.stream("payments");

KStream<String, String> enriched = orders.join(
    payments,
    (orderValue, paymentValue) -> orderValue + "|" + paymentValue,
    JoinWindows.of(Duration.ofMinutes(5)),
    Joined.with(Serdes.String(), Serdes.String(), Serdes.String())
);

enriched.to("enriched-orders");

Schema Registry

Schema Registry asegura compatibilidad entre producers y consumers enforceando un schema (Avro, Protobuf, JSON Schema).

Registrar un schema Avro

from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

schema_registry_conf = {"url": "http://localhost:8081"}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

schema_str = """
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "int"},
    {"name": "amount", "type": "double"},
    {"name": "status", "type": "string"}
  ]
}
"""

avro_serializer = AvroSerializer(schema_registry_client, schema_str)

producer_conf = {
    "bootstrap.servers": "localhost:9092",
    "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
    "value.serializer": avro_serializer,
}

producer = SerializingProducer(producer_conf)
producer.produce(
    topic="orders-avro",
    key="order-1",
    value={"order_id": 1, "amount": 99.99, "status": "created"},
)
producer.flush()

Kafka Connect

Kafka Connect es un framework para conectar Kafka con sistemas externos (bases de datos, colas, file systems).

JDBC Source Connector

{
  "name": "postgres-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://localhost:5432/mydb",
    "connection.user": "postgres",
    "connection.password": "secret",
    "table.whitelist": "orders",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "topic.prefix": "postgres-",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter"
  }
}

Elasticsearch Sink Connector

{
  "name": "es-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "processed-orders",
    "connection.url": "http://localhost:9200",
    "type.name": "_doc",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "schema.ignore": "true"
  }
}

Patrones de Stream Processing

PatrónDescripciónCaso de Uso
FilterDescartar eventos que no matchean un predicadoRemover órdenes inválidas
MapTransformar cada eventoEnriquecer con data de customer
AggregateGroup y reduceContar órdenes por customer
WindowAgregar sobre tiempoRolling average de 5 minutos
JoinCombinar dos streamsÓrdenes + pagos
SplitRutear a múltiples topicsÓrdenes high/low value
RepartitionRe-keyear un streamGroup por customer_id en vez de order_id

Pautas

  • Usar acks=all para producers — asegura que los datos se escriban a múltiples réplicas
  • Habilitar producers idempotentes — previene duplicados en retries
  • Setear enable.auto.commit=false — commitear offsets después de procesar, no antes
  • Manejar errores de deserialización — usar un dead-letter topic para poison pills
  • Usar Schema Registry — enforce compatibilidad de schema, prevenir breaking consumers
  • Particionar por key para ordering — eventos con el mismo key van a la misma partition
  • Sizear partitions correctamente — apuntar a 10-50 MB/s throughput por partition
  • Monitorear consumer lag — lag creciente significa que los consumers no pueden seguir
  • Usar exactly-once semantics para pipelines críticos — setear processing.guarantee=exactly_once_v2
  • Setear retención sabiamente — usar retención por tiempo (7 días) o por tamaño (1GB) por topic
  • Usar consumer groups para paralelismo — cada consumer en un group obtiene un subset de partitions

Errores Comunes

  • No manejar poison pills — un mensaje malo bloquea toda la partition
  • Usar auto.offset.reset=latest — consumers nuevos se pierden datos históricos
  • No setear max.poll.interval.ms — consumers lentos son kickeados del group
  • Over-partitioning — demasiadas partitions aumentan overhead y latencia
  • No monitorear lag — los issues aparecen solo cuando los usuarios se quejan
  • Usar Kafka como database — Kafka es un log, no un store queryable
  • No usar Schema Registry — cambios de schema rompen consumers silenciosamente
  • Producir sin key — los eventos se distribuyen random, perdiendo garantías de ordering

Preguntas Frecuentes

¿Cuántas partitions debería tener un topic?

Comenzar con 6-12 partitions para la mayoría de topics. Añadir partitions si el consumer lag crece o el throughput excede 50 MB/s por partition. No puedes reducir partitions después — solo añadir. Planear para peak throughput.

¿Cómo aseguro exactly-once processing?

Usar Kafka Streams con processing.guarantee=exactly_once_v2. Para loops consumer-producer, usar la transactional API: producer.initTransactions(), beginTransaction(), send + commit offset, commitTransaction().

¿Cuál es la diferencia entre Kafka Streams y Kafka Consumers?

Kafka Consumers son low-level — manejas offsets, retries y state vos mismo. Kafka Streams es una librería high-level que maneja state management, exactly-once, rebalancing y local state stores. Usar Kafka Streams para stateful processing (aggregations, joins). Usar consumers para simple event processing.