Implementar event sourcing en una base de datos relacional
Construye sistemas de event sourcing usando bases de datos relacionales con event stores, proyecciones y snapshotting para auditoría y consultas temporales.
Implementar event sourcing en una base de datos relacional
Visión General
Event sourcing almacena cambios de estado como una secuencia de eventos inmutables en lugar de sobrescribir el estado actual. En lugar de guardar balance = 100, registras Deposited $50 y Deposited $50. El estado actual se deriva reproduciendo todos los eventos. Esto provee un audit trail completo, consultas temporales y la capacidad de reconstruir el estado en cualquier punto del tiempo.
Esta receta implementa un event store, proyecciones (read models) y snapshotting usando PostgreSQL, MySQL y SQL Server.
Cuándo Usar
Usa este recurso cuando:
- Necesitas un audit trail completo de todos los cambios de estado (finanzas, cumplimiento)
- Se requieren consultas temporales: “¿Cuál era el nivel de inventario hace 30 días?”
- Quieres desacoplar modelos de escritura y lectura (CQRS)
- Reconstruir read models desde cero es preferible a migraciones de esquema complejas
Solución
Python (PostgreSQL)
import json
from datetime import datetime
from uuid import uuid4
class EventStore:
def __init__(self, conn):
self.conn = conn
def append(self, aggregate_id, event_type, payload, expected_version=None):
with self.conn.cursor() as cur:
# Verificación de concurrencia optimista
cur.execute(
"SELECT COUNT(*) FROM events WHERE aggregate_id = %s",
(aggregate_id,)
)
current_version = cur.fetchone()[0]
if expected_version is not None and current_version != expected_version:
raise ConcurrencyException(f"Expected {expected_version}, found {current_version}")
cur.execute("""
INSERT INTO events (id, aggregate_id, event_type, payload, version, occurred_at)
VALUES (%s, %s, %s, %s, %s, %s)
""", (str(uuid4()), aggregate_id, event_type, json.dumps(payload),
current_version + 1, datetime.utcnow()))
self.conn.commit()
def get_events(self, aggregate_id):
with self.conn.cursor() as cur:
cur.execute("""
SELECT event_type, payload, version, occurred_at
FROM events WHERE aggregate_id = %s ORDER BY version
""", (aggregate_id,))
return [{
"type": row[0], "payload": json.loads(row[1]),
"version": row[2], "occurred_at": row[3]
} for row in cur.fetchall()]
# Proyección (read model)
def rebuild_account_balance(conn, account_id):
store = EventStore(conn)
events = store.get_events(account_id)
balance = 0
for event in events:
if event["type"] == "Deposit":
balance += event["payload"]["amount"]
elif event["type"] == "Withdrawal":
balance -= event["payload"]["amount"]
return balance
JavaScript (MySQL)
const { v4: uuidv4 } = require('uuid');
class EventStore {
constructor(pool) {
this.pool = pool;
}
async append(aggregateId, eventType, payload, expectedVersion = null) {
const conn = await this.pool.getConnection();
try {
await conn.beginTransaction();
const [rows] = await conn.execute(
'SELECT COUNT(*) as count FROM events WHERE aggregate_id = ?',
[aggregateId]
);
const currentVersion = rows[0].count;
if (expectedVersion !== null && currentVersion !== expectedVersion) {
throw new Error(`Conflicto de concurrencia: expected ${expectedVersion}`);
}
await conn.execute(
`INSERT INTO events (id, aggregate_id, event_type, payload, version, occurred_at)
VALUES (?, ?, ?, ?, ?, NOW())`,
[uuidv4(), aggregateId, eventType, JSON.stringify(payload), currentVersion + 1]
);
await conn.commit();
} finally {
conn.release();
}
}
async getEvents(aggregateId) {
const [rows] = await this.pool.execute(
`SELECT event_type, payload, version, occurred_at
FROM events WHERE aggregate_id = ? ORDER BY version`,
[aggregateId]
);
return rows.map(r => ({
type: r.event_type,
payload: JSON.parse(r.payload),
version: r.version,
occurredAt: r.occurred_at
}));
}
}
// Snapshot para evitar reproducir todos los eventos
async function getBalanceWithSnapshot(pool, accountId) {
const [snapshots] = await pool.execute(
'SELECT * FROM snapshots WHERE aggregate_id = ? ORDER BY version DESC LIMIT 1',
[accountId]
);
let balance = 0;
let fromVersion = 0;
if (snapshots.length > 0) {
balance = snapshots[0].state.balance;
fromVersion = snapshots[0].version;
}
const store = new EventStore(pool);
const events = await store.getEvents(accountId);
const newEvents = events.filter(e => e.version > fromVersion);
for (const event of newEvents) {
if (event.type === 'Deposit') balance += event.payload.amount;
if (event.type === 'Withdrawal') balance -= event.payload.amount;
}
return balance;
}
Java (SQL Server con Spring)
@Entity
@Table(name = "events")
public class EventEntity {
@Id private UUID id;
private UUID aggregateId;
private String eventType;
@Column(columnDefinition = "nvarchar(max)")
private String payload;
private int version;
private Instant occurredAt;
}
@Service
public class EventStore {
@Autowired private EventRepository repo;
@Transactional
public void append(UUID aggregateId, String eventType, String payload, Integer expectedVersion) {
int currentVersion = repo.countByAggregateId(aggregateId);
if (expectedVersion != null && currentVersion != expectedVersion) {
throw new ConcurrencyException("Expected " + expectedVersion);
}
EventEntity event = new EventEntity();
event.setId(UUID.randomUUID());
event.setAggregateId(aggregateId);
event.setEventType(eventType);
event.setPayload(payload);
event.setVersion(currentVersion + 1);
event.setOccurredAt(Instant.now());
repo.save(event);
}
public List<EventEntity> getEvents(UUID aggregateId) {
return repo.findByAggregateIdOrderByVersionAsc(aggregateId);
}
}
// Servicio de snapshot
@Service
public class SnapshotService {
@Autowired private EventStore eventStore;
@Autowired private SnapshotRepository snapshotRepo;
public AccountState rebuildState(UUID accountId) {
Optional<Snapshot> snapshot = snapshotRepo
.findTopByAggregateIdOrderByVersionDesc(accountId);
int startVersion = snapshot.map(Snapshot::getVersion).orElse(0);
AccountState state = snapshot.map(Snapshot::getState)
.orElse(new AccountState(0));
List<EventEntity> events = eventStore.getEvents(accountId).stream()
.filter(e -> e.getVersion() > startVersion)
.collect(Collectors.toList());
for (EventEntity event : events) {
state = applyEvent(state, event);
}
return state;
}
}
Explicación
Event sourcing invierte el CRUD tradicional: en lugar de almacenar el estado actual, almacenas el historial de cambios. Conceptos clave:
- Event store: Un log solo de append de eventos de dominio
- Aggregate: El límite de consistencia; cada aggregate tiene su propio stream de eventos
- Proyección: Un read model derivado construido reproduciendo eventos
- Snapshot: Una captura periódica de estado para evitar reproducir miles de eventos
El esquema de base de datos relacional es simple: una tabla events con aggregate_id, event_type, payload (JSON), version y occurred_at.
Variantes
| Almacenamiento | Flexibilidad de esquema | Velocidad de query | Ideal para |
|---|---|---|---|
| PostgreSQL + JSONB | Alta | Media | Propósito general, soporte JSON rico |
| MySQL + JSON | Alta | Media | Infraestructura MySQL existente |
| SQL Server | Media | Rápida | Empresarial, proyecciones T-SQL |
| Dedicado (EventStoreDB) | Nativa | Muy rápida | Event sourcing a gran escala |
Mejores Prácticas
- Versiona cada evento: El control de concurrencia optimista previene actualizaciones perdidas
- Usa JSONB/JSON para payloads: Flexibilidad de esquema sin migraciones; valida en la capa de aplicación
- Crea snapshots cada N eventos: Balance entre almacenamiento y rendimiento de reproducción
- Mantén los eventos pequeños: Payloads grandes ralentizan la reproducción y aumentan el almacenamiento
- Separa proyecciones del event store: Las proyecciones pueden reconstruirse; los eventos son la fuente de verdad
Errores Comunes
- No versionar eventos: Sin números de versión no puedes detectar modificaciones concurrentes
- Almacenar estado actual Y eventos: Esto crea escrituras duales y riesgos de consistencia
- Reproducir todos los eventos en cada lectura: Usa snapshots o tablas de proyección dedicadas
- Eventos mutables: Los eventos deben ser inmutables — nunca actualices o borres eventos históricos
- Falta de evolución de esquema de eventos: Eventos antiguos necesitan estrategias de migración a medida que el modelo de dominio cambia
Preguntas Frecuentes
P: ¿No consume event sourcing demasiado almacenamiento? R: Los eventos son típicamente pequeños (cientos de bytes). Para un sistema con 1M transacciones/día, eso es ~100MB/día. Con compresión y archivado, los costos de almacenamiento son usualmente insignificantes comparados con el valor de auditoría.
P: ¿Cómo manejo cambios de esquema en eventos?
R: Usa versionamiento de eventos (Deposit_v1, Deposit_v2) o upcasting — transforma eventos antiguos al nuevo esquema durante la reproducción. Nunca modifiques eventos almacenados.
P: ¿Puedo usar event sourcing con CQRS? R: Sí — CQRS y event sourcing se complementan naturalmente. Los comandos agregan eventos al modelo de escritura; las proyecciones crean read models optimizados. El read model puede estar en una base de datos completamente diferente (Elasticsearch, Redis, etc.).
Recursos Relacionados
Handle Database Deadlocks and Retries
Detect, prevent, and recover from database deadlocks with automatic retry logic, isolation levels, and query ordering strategies.
RecipeSet Up Database Read Replicas for Scaling
Scale read-heavy workloads with database read replicas, replication lag monitoring, and read/write splitting across primary and replica instances.
RecipeFull-Text Search
How to implement full-text search with Elasticsearch, Meilisearch, and PostgreSQL.
PatternEvent Sourcing Pattern
Store the state of an application as a sequence of events rather than storing only the current state. An architectural pattern for audit-friendly systems.
DocDatabase Migration Runbook Template
A database migration runbook template for executing schema changes safely with rollback procedures, verification steps, and communication plans.