Skip to content
SP StackPractices
advanced Por StackPractices

Patron de Programador-Agente-Supervisor

Coordina la programacion de trabajos resilientes separando la logica de programacion de los agentes de ejecucion y agregando un supervisor que monitorea, reinicia y gestiona el ciclo de vida.

Nota para desarrolladores hispanohablantes: Esta guía incluye ejemplos y convenciones de nomenclatura adaptadas a equipos que trabajan en español. Cuando existen diferencias significativas en terminología técnica entre el inglés y el español, se indican explícitamente para facilitar la comunicación en equipos multiculturales.

Patron de Programador-Agente-Supervisor

Resumen

El Patron de Programador-Agente-Supervisor estructura el procesamiento de trabajos distribuidos en tres roles distintos: un Programador que decide que trabajo hacer y cuando, Agentes que ejecutan el trabajo real, y un Supervisor que monitorea agentes, maneja fallas y gestiona el ciclo de vida del sistema.

Esta separacion de responsabilidades hace al sistema resiliente ante caidas de agentes, particiones de red y workers no responsivos. El supervisor detecta agentes fallidos, los reinicia, redistribuye su trabajo y asegura que la programacion continua incluso cuando componentes individuales fallan. Es la base de muchos sistemas de orquestacion, desde los arboles de supervision OTP de Erlang hasta los controladores de Kubernetes.

Cuando Usar

  • Trabajos de larga duracion o de fondo que deben sobrevivir fallas de maquinas individuales
  • Ejecucion de tareas distribuidas donde los workers pueden fallar o volverse inalcanzables
  • Sistemas que requieren recuperacion automatica, reintentos y aislamiento de fallas
  • Flujos de trabajo de multiples pasos donde cada paso se supervisa independientemente
  • Entornos donde los ejecutores de trabajos (agentes) corren en infraestructura heterogenea o efimera

Cuando Evitar

  • Trabajos cron simples en un solo servidor donde gestores de procesos (systemd, PM2) son suficientes
  • Manejo de solicitudes HTTP sin estado donde las fallas se muestran inmediatamente al llamador
  • Sistemas donde el overhead de supervision excede el costo de fallas ocasionales
  • Tareas de muy corta duracion donde la eleccion de supervisor y verificaciones de heartbeat agregan latencia inaceptable

Solucion

Python (Supervisor Personalizado con Agentes)

import asyncio
import time
import uuid
from dataclasses import dataclass, field
from typing import Dict, Optional, Callable
from enum import Enum

class JobStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class Job:
    id: str
    task: Callable
    status: JobStatus = JobStatus.PENDING
    agent_id: Optional[str] = None
    retries: int = 0
    max_retries: int = 3

class Agent:
    def __init__(self, agent_id: str):
        self.id = agent_id
        self.current_job: Optional[Job] = None
        self.last_heartbeat = time.time()

    async def execute(self, job: Job) -> bool:
        self.current_job = job
        job.status = JobStatus.RUNNING
        job.agent_id = self.id
        try:
            result = await asyncio.wait_for(self._run_job(job), timeout=30.0)
            job.status = JobStatus.COMPLETED
            return True
        except Exception as e:
            job.status = JobStatus.FAILED
            return False
        finally:
            self.current_job = None
            self.last_heartbeat = time.time()

    async def _run_job(self, job: Job):
        await asyncio.sleep(0.5)
        if callable(job.task):
            return job.task()
        return job.task

    def is_healthy(self) -> bool:
        return time.time() - self.last_heartbeat < 15

class Supervisor:
    def __init__(self, check_interval: float = 10.0):
        self.agents: Dict[str, Agent] = {}
        self.jobs: Dict[str, Job] = {}
        self.check_interval = check_interval

    def add_agent(self, agent: Agent):
        self.agents[agent.id] = agent

    def submit_job(self, task: Callable) -> str:
        job = Job(id=str(uuid.uuid4()), task=task)
        self.jobs[job.id] = job
        return job.id

    async def start(self):
        asyncio.create_task(self._monitor_loop())

    async def _monitor_loop(self):
        while True:
            await asyncio.sleep(self.check_interval)
            await self._check_agents()
            await self._redistribute_work()

    async def _check_agents(self):
        failed = [aid for aid, a in self.agents.items() if not a.is_healthy()]
        for aid in failed:
            agent = self.agents[aid]
            if agent.current_job and agent.current_job.status == JobStatus.RUNNING:
                agent.current_job.status = JobStatus.FAILED
                agent.current_job.retries += 1
            del self.agents[aid]
            new_agent = Agent(f"agent-{uuid.uuid4().hex[:8]}")
            self.add_agent(new_agent)

    async def _redistribute_work(self):
        available = [a for a in self.agents.values() if a.current_job is None and a.is_healthy()]
        pending = [j for j in self.jobs.values() if j.status in (JobStatus.PENDING, JobStatus.FAILED) and j.retries < j.max_retries]
        for job in pending:
            if not available:
                break
            agent = available.pop(0)
            if job.status == JobStatus.FAILED:
                job.status = JobStatus.PENDING
            asyncio.create_task(self._execute_job(agent, job))

    async def _execute_job(self, agent: Agent, job: Job):
        success = await agent.execute(job)
        if not success and job.retries < job.max_retries:
            job.retries += 1
            job.status = JobStatus.PENDING

Java (Supervision Estilo Akka)

public class SchedulerAgentSupervisor {
    private final ScheduledExecutorService scheduler;
    private final ExecutorService workerPool;
    private final Map<String, Agent> agents = new ConcurrentHashMap<>();
    private final Map<String, Job> jobs = new ConcurrentHashMap<>();
    private final AtomicInteger agentCounter = new AtomicInteger(0);
    private volatile boolean running;

    public SchedulerAgentSupervisor(int workerPoolSize) {
        this.scheduler = Executors.newScheduledThreadPool(2);
        this.workerPool = Executors.newFixedThreadPool(workerPoolSize);
    }

    public void start() {
        running = true;
        scheduler.scheduleAtFixedRate(this::monitorAgents, 5, 5, TimeUnit.SECONDS);
        scheduler.scheduleAtFixedRate(this::schedulePendingJobs, 1, 1, TimeUnit.SECONDS);
    }

    public String spawnAgent() {
        String agentId = "agent-" + agentCounter.incrementAndGet();
        agents.put(agentId, new Agent(agentId));
        return agentId;
    }

    public String submitJob(Runnable task) {
        String jobId = "job-" + UUID.randomUUID().toString().substring(0, 8);
        jobs.put(jobId, new Job(jobId, task));
        return jobId;
    }

    private void monitorAgents() {
        List<String> failed = new ArrayList<>();
        for (Agent agent : agents.values()) {
            if (!agent.isHealthy()) {
                failed.add(agent.getId());
                if (agent.getCurrentJob() != null) {
                    agent.getCurrentJob().setStatus(JobStatus.FAILED);
                    agent.getCurrentJob().incrementRetries();
                }
            }
        }
        for (String id : failed) {
            agents.remove(id);
            spawnAgent();
        }
    }

    private void schedulePendingJobs() {
        List<Agent> available = agents.values().stream()
            .filter(a -> a.getCurrentJob() == null && a.isHealthy()).toList();
        List<Job> pending = jobs.values().stream()
            .filter(Job::canRetry).toList();
        int limit = Math.min(available.size(), pending.size());
        for (int i = 0; i < limit; i++) {
            dispatchJob(available.get(i), pending.get(i));
        }
    }

    private void dispatchJob(Agent agent, Job job) {
        job.setStatus(JobStatus.RUNNING);
        job.setAgentId(agent.getId());
        agent.setCurrentJob(job);
        workerPool.submit(() -> {
            try {
                job.getTask().run();
                job.setStatus(JobStatus.COMPLETED);
            } catch (Exception e) {
                job.setStatus(JobStatus.FAILED);
                job.incrementRetries();
            } finally {
                agent.setCurrentJob(null);
                agent.recordHeartbeat();
            }
        });
    }
}

JavaScript (Node.js con Clustering estilo PM2)

const cluster = require('cluster');
const os = require('os');

class JobSupervisor {
    constructor(options = {}) {
        this.workers = new Map();
        this.jobs = new Map();
        this.maxWorkers = options.maxWorkers || os.cpus().length;
        this.heartbeatTimeout = options.heartbeatTimeout || 10000;
        this.checkInterval = options.checkInterval || 5000;
    }

    start() {
        if (cluster.isPrimary) {
            this._startSupervisor();
        } else {
            this._startAgent();
        }
    }

    _startSupervisor() {
        for (let i = 0; i < this.maxWorkers; i++) {
            this._spawnWorker();
        }
        setInterval(() => this._monitorWorkers(), this.checkInterval);
        cluster.on('exit', (worker, code, signal) => {
            this.workers.delete(worker.id);
            this._spawnWorker();
        });
    }

    _spawnWorker() {
        const worker = cluster.fork();
        this.workers.set(worker.id, {
            worker, lastHeartbeat: Date.now(), currentJob: null
        });
    }

    _monitorWorkers() {
        const now = Date.now();
        for (const [id, info] of this.workers) {
            if (now - info.lastHeartbeat > this.heartbeatTimeout) {
                if (info.currentJob) this._requeueJob(info.currentJob);
                info.worker.kill('SIGTERM');
                this.workers.delete(id);
                this._spawnWorker();
            }
        }
    }

    submitJob(jobData) {
        const jobId = `job-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
        this.jobs.set(jobId, { id: jobId, data: jobData, status: 'pending', retries: 0 });
        for (const [id, info] of this.workers) {
            if (!info.currentJob) {
                this._dispatchJob(id, jobId);
                return jobId;
            }
        }
        return jobId;
    }

    _dispatchJob(workerId, jobId) {
        const info = this.workers.get(workerId);
        info.currentJob = jobId;
        const job = this.jobs.get(jobId);
        job.status = 'running'; job.workerId = workerId;
        info.worker.send({ type: 'execute', jobId, data: job.data });
    }

    _startAgent() {
        process.on('message', async (msg) => {
            if (msg.type === 'execute') {
                try {
                    const heartbeat = setInterval(() => {
                        process.send({ type: 'heartbeat', jobId: msg.jobId });
                    }, 3000);
                    const result = await this.executeJob(msg.data);
                    clearInterval(heartbeat);
                    process.send({ type: 'complete', jobId: msg.jobId, result });
                } catch (error) {
                    process.send({ type: 'failed', jobId: msg.jobId, error: error.message });
                }
            }
        });
    }

    async executeJob(data) {
        await new Promise(r => setTimeout(r, 1000));
        return { processed: true, data };
    }
}

Explicacion

El patron separa responsabilidades en tres capas:

  • Programador: Decide que se ejecuta, cuando y donde. Mantiene la cola de trabajos, orden de prioridades y politicas de reintento. No ejecuta trabajo directamente.
  • Agente: Ejecuta los trabajos asignados y reporta progreso. Los agentes son desechables — si uno falla, el supervisor lo reemplaza. Los agentes deben ser sin estado; todo el estado del trabajo vive en la tienda de trabajos del programador.
  • Supervisor: Observa agentes via heartbeats, detecta fallas, reinicia agentes y redistribuye trabajos fallidos. Es la capa de resiliencia que hace al sistema auto-reparable.

La idea clave es que los agentes son ganado, no mascotas. El supervisor los trata como recursos efimeros que pueden crearse, destruirse y reemplazarse sin afectar el sistema.

Variantes

VarianteMecanismo de SupervisorIdeal Para
Erlang OTPArboles de supervision, reinicio one-for-oneTelecom, sistemas soft real-time
KubernetesReplicaSets, health checks, reinicios rollingMicroservicios contenedorizados
AWS Step FunctionsMaquina de estados con manejo de erroresFlujos de trabajo nativos en la nube
Celery con FlowerMonitoreo de workers, control remotoColas de tareas Python
Implementacion personalizadaMonitoreo directo de procesosSistemas embebidos, edge computing

Mejores Practicas

  • Hacer los agentes sin estado
  • Usar heartbeats con timeouts
  • Implementar backoff exponencial para reintentos
  • Limitar el numero de reintentos
  • Escalar los supervisores tambien

Errores Comunes

  • Agentes con estado
  • Heartbeats faltantes
  • Supervisor como punto unico de falla
  • Sin circuit breaker para trabajos fallidos
  • Ignorar el tiempo de inicio del agente

Ejemplos del Mundo Real

  • Erlang/OTP: Marco pionero del patron de supervision. Los procesos se organizan en arboles de supervision donde los supervisores monitorean workers y aplican estrategias de reinicio. Impulsa la infraestructura de mensajeria de WhatsApp.
  • Kubernetes: Los controladores actuan como supervisores. Un controlador de Deployment monitorea Pods (agentes), detecta los no saludables via health checks, y crea reemplazos.
  • Apache Airflow: El modelo de executor de Airflow usa un programador que analiza DAGs y coloca tareas en workers (agentes). El programador monitorea el estado en la base de datos de metadatos y reintenta tareas fallidas.

Preguntas Frecuentes

P: ¿Cual es la diferencia con una simple cola de tareas con workers? R: Una cola de tareas delega ejecucion pero no monitorea activamente la salud de los workers ni reemplaza automaticamente los fallidos. El supervisor agrega la capa de gestion de ciclo de vida y auto-reparacion.

P: ¿El programador y el supervisor deberian ser el mismo proceso? R: Pueden serlo, pero separarlos mejora la resiliencia. Si el programador falla, el supervisor puede aun mantener los agentes existentes.

P: ¿Como se relaciona este patron con Kubernetes? R: Kubernetes implementa el patron directamente: kube-scheduler decide la colocacion (programador), los Pods ejecutan trabajo (agentes), y ReplicaSets/Deployments monitorean y reemplazan Pods fallidos (supervisor).

P: ¿Que pasa si el supervisor falla? R: El sistema pierde supervision. Mitigar ejecutando supervisores en pares con eleccion de lider, o usando un sistema de consenso distribuido.

P: ¿Puede este patron manejar millones de trabajos? R: Si — pero el programador se convierte en cuello de botella. Usar programacion particionada (un programador por tipo de trabajo o shard) y pools de agentes de nada compartida para escalar horizontalmente.