Skip to content
SP StackPractices
intermediate Por Mathias Paulenko

Task Queues y RPC con RabbitMQ y AMQP

Implementa distribucion confiable de tareas y patrones request-reply usando RabbitMQ con durable queues, dead-letter exchanges y prefetch para concurrencia controlada

Nota para desarrolladores hispanohablantes: Esta guía incluye ejemplos y convenciones de nomenclatura adaptadas a equipos que trabajan en español. Cuando existen diferencias significativas en terminología técnica entre el inglés y el español, se indican explícitamente para facilitar la comunicación en equipos multiculturales.

Task Queues y RPC con RabbitMQ y AMQP

Distribuye tareas de background confiablemente e implementa patrones request-reply usando RabbitMQ. Esta recipe cubre durable queues, dead-letter exchanges para mensajes fallidos, limites de prefetch para concurrencia controlada, y RPC sobre AMQP para llamadas sincronicas entre servicios.

Cuando Usar Esto

  • Jobs de background (procesamiento de imagenes, envio de emails) no deben bloquear el request flow principal
  • Tareas fallidas deberian reintentarse con exponential backoff o enrutarse a dead-letter queues
  • Los servicios necesitan comunicacion RPC sincronica sin overhead de HTTP

Solucion

1. Producer con Durable Queue

// rabbitmq/producer.ts
import amqp from 'amqplib';

const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();

// Durable queue sobrevive restart del broker
await channel.assertQueue('email.tasks', {
  durable: true,
});

// Dead letter exchange para mensajes fallidos
await channel.assertExchange('dlx', 'direct');
await channel.assertQueue('email.tasks.dlq', { durable: true });
await channel.bindQueue('email.tasks.dlq', 'dlx', 'email.tasks');

async function sendEmailTask(email: unknown): Promise<void> {
  channel.sendToQueue('email.tasks', Buffer.from(JSON.stringify(email)), {
    persistent: true,
    headers: { 'x-attempt': 1 },
  });
}

2. Worker con Prefetch y Ack

// rabbitmq/worker.ts
const channel = await connection.createChannel();

await channel.prefetch(5); // Procesa 5 mensajes concurrentemente por worker

await channel.consume('email.tasks', async (msg) => {
  if (!msg) return;

  const email = JSON.parse(msg.content.toString());
  const attempt = msg.properties.headers?.['x-attempt'] || 1;

  try {
    await sendEmail(email);
    channel.ack(msg); // Remueve de la queue en exito
  } catch (error) {
    if (attempt >= 3) {
      // Rechaza y envia a dead letter queue
      channel.reject(msg, false);
    } else {
      // Nack y requeue para reintento
      channel.nack(msg, false, true);

      // Publica con attempt incrementado
      channel.sendToQueue('email.tasks', msg.content, {
        persistent: true,
        headers: { 'x-attempt': attempt + 1 },
      });
    }
  }
});

3. Patron RPC Request-Reply

// rabbitmq/rpc-client.ts
async function rpcCall(queue: string, payload: unknown): Promise<unknown> {
  const correlationId = generateId();
  const { queue: replyQueue } = await channel.assertQueue('', { exclusive: true });

  return new Promise((resolve, reject) => {
    const timeout = setTimeout(() => reject(new Error('RPC timeout')), 5000);

    channel.consume(replyQueue, (msg) => {
      if (msg?.properties.correlationId === correlationId) {
        clearTimeout(timeout);
        resolve(JSON.parse(msg.content.toString()));
        channel.ack(msg);
      }
    });

    channel.sendToQueue(queue, Buffer.from(JSON.stringify(payload)), {
      replyTo: replyQueue,
      correlationId,
      expiration: '5000',
    });
  });
}

// rabbitmq/rpc-server.ts
await channel.assertQueue('calc.multiply');
await channel.consume('calc.multiply', (msg) => {
  if (!msg) return;

  const { a, b } = JSON.parse(msg.content.toString());
  const result = a * b;

  channel.sendToQueue(
    msg.properties.replyTo,
    Buffer.from(JSON.stringify({ result })),
    { correlationId: msg.properties.correlationId }
  );

  channel.ack(msg);
});

4. Docker Compose Setup

# docker-compose.rabbitmq.yml
services:
  rabbitmq:
    image: rabbitmq:3-management-alpine
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: secret
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq

volumes:
  rabbitmq_data:

Como Funciona

  • Exchanges enrutan mensajes a queues basandose en reglas de binding
  • Durable queues persisten mensajes a traves de restarts del broker
  • Prefetch limita mensajes no acknowledged por consumer para prevenir overload
  • Dead-letter exchanges reciben mensajes que son rechazados o expiran
  • RPC usa reply queues y correlation IDs para matchear responses a requests

Consideraciones de Produccion

  • Usa quorum queues para almacenamiento de mensajes replicado y fault-tolerant
  • Monitorea queue depth con el management plugin o Prometheus exporter
  • Implementa circuit breakers en el lado del producer cuando queue depth excede thresholds

Errores Comunes

  • No hacer ack de mensajes, causando agotamiento de memoria en el broker
  • Usar auto-ack para tareas de larga duracion que pueden fallar
  • Crear reply queues sin cleanup, causando queue leaks en RPC

FAQ

P: En que se diferencia de Kafka? R: RabbitMQ soporta routing complejo, RPC y menor latencia por mensaje. Kafka se destaca en log streaming de alto throughput y replay.

P: Deberia usar topic o direct exchanges? R: Usa direct para routing simple por key. Usa topic para routing basado en patrones (ej. orders.*.created).