Event API
Sistema de eventos: local, distribuido, Event Sourcing y CQRS (resumen avanzado).
Índice
- Conceptos Base
- Arquitectura
- Configuración (Memory / Redis / Custom)
- Modelo de Evento y Metadata
- Emisión & Suscripción
- Event Store & Concurrencia Optimista
- Reconstrucción y Snapshots
- Projections & Read Models
- CQRS (Command / Query Flow)
- Sagas / Process Managers
- Particionado & Escalado
- Idempotencia & Replays Seguros
- Correlation & Causation IDs
- Dead Letter Queue (DLQ) & Retries
- Versionado de Eventos y Migraciones
- Performance & Backpressure
- Seguridad (Validación / Hardening)
- Testing Estrategias
- Buenas Prácticas
- Anti‑Patrones
- Checklist
- Roadmap Futuro
Nota: Si esta página supera límites de tamaño en build (warnings Babel >500KB), considerar segmentar en subpáginas (
/event-store,/event-bus,/cqrs,/sagas) para reducir payload.
Conceptos Base
- Event Store: persiste eventos (append-only)
- Event Bus: distribución (in-memory / Redis / otros)
- Handlers: reaccionan a eventos
- Projections: vistas materializadas
- Sagas / Process Managers: orquestación de workflows
Arquitectura
Command -> Domain -> Eventos (append) -> Event Store --> (emit local) --> Handlers (Projections/Sagas)
\-> Event Bus Distribuido -> Otros Servicios
Replay -> reconstruye agregados / reprocesa proyeccionesComponentes:
| Componente | Rol | Persistencia |
|---|---|---|
| EventStore | Append-only, control versión | Sí |
| EventBus | Pub/Sub local + adapter distribuido | No (bus) |
| Projections | Vistas materializadas | Externa (DB optimizada) |
| Sagas | Orquestación multi-evento | Persistente (estado saga) |
Configuración
import { EventSystemFactory } from 'fox-framework';
// Desarrollo (todo in-memory)
const es = EventSystemFactory.createMemorySystem();
// Producción (Redis bus, store memory temporal)
const prod = EventSystemFactory.createFromConfig({
store: { type:'memory' },
bus: { adapter:'redis', connection:{ host:'redis', port:6379 }, partitions:4 },
performance:{ batchSize:50, bufferSize:1000 },
errorHandling:{ retryFailedMessages:true, maxRetries:3, deadLetterQueue:true }
});Modelo de Evento y Metadata
interface EventInterface {
id: string;
type: string; // domain.subdomain.action
aggregateId?: string;
version?: number; // control de concurrencia
data: any; // payload inmutable
metadata?: {
correlationId?: string;
causationId?: string;
source?: string;
tenantId?: string;
traceId?: string;
};
timestamp?: Date;
}Reglas:
typeestable (no cambiar significado)datamínima necesariametadatapara trazabilidad & multi‑tenancy
Emisión & Suscripción
es.on('order.paid', async evt => { /* actualizar proyección */ });
await es.emit({ id:uid(), type:'order.paid', aggregateId:'order_1', data:{ total:100 }, metadata:{ correlationId:req.id } });Wildcard según implementación (order.*).
Event Store & Concurrencia Optimista
Al hacer append se valida versión esperada:
await store.append('order_1', [ { id:'e3', type:'order.shipped', version:3, data:{ tracking:'X' } } ]);Si versión no coincide -> conflicto (retry después de reconstruir estado).
Reconstrucción y Snapshots
async function loadAggregate(id){
const history = await store.read(id);
if (history.length > 200) { /* snapshot heurística */ }
return replay(history);
}Snapshot pattern (futuro): guardar cada N eventos resumen de estado para acelerar replay.
Projections & Read Models
Principios:
- Deterministas
- Idempotentes
- Eventual consistency aceptada
es.on('user.registered', async evt => {
await userReadModel.upsert({ id:evt.aggregateId, email:evt.data.email, createdAt:evt.timestamp });
});Rebuild: vaciar tabla + replay eventos (offline maintenance).
CQRS (Flujo)
await commandBus.execute(new RegisterUserCommand(...));
const profile = await queryBus.execute(new GetUserProfileQuery(userId));Comandos generan eventos -> Projections actualizan read models -> Queries consultan read DB.
Sagas / Process Managers
Orquestan flujos multi-agregado.
es.on('order.paid', async evt => {
await es.emit({ id:uid(), type:'shipment.requested', aggregateId:evt.aggregateId, data:{ address:evt.data.address } });
});Persistir estado saga (step actual, timeouts) para tolerar reinicios.
Particionado & Escalado
- Partición por
aggregateIdhash -> preserva orden por stream - Consumidores paralelos por partición
- Evitar fan-out explosivo (usar eventos derivados agregados)
Idempotencia & Replays Seguros
Handlers deben poder procesar evento duplicado sin efectos secundarios. Patrones:
- Guardar
lastEventVersionprocesada por proyección - Upserts en vez de inserts ciegos
- Usar
event.iden tabla de deduplicación si side-effect externo (email, pago)
Correlation & Causation IDs
correlationId: cadena de eventos iniciados por una entrada.
causationId: evento/comando inmediato que causó el nuevo evento.
Útiles para tracing distribuido y debugging.
Dead Letter Queue (DLQ) & Retries
Política:
| Intentos | Acción |
|---|---|
< maxRetries | Reintentar exponencial |
== maxRetries | Mover a DLQ |
| DLQ procesado | Analizar, fix, re‑enqueue |
Versionado de Eventos y Migraciones
Estrategias:
- Evolución compatible (añadir campos opcionales)
- Evento nuevo (
user.name_corrected) en vez de mutar antiguo - Proyección migradora que re-emite eventos transformados para rebuild Nunca sobrescribir eventos históricos.
Performance & Backpressure
| Aspecto | Estrategia |
|---|---|
| Lote (batch append) | Reducir I/O persistencia |
| Buffer interno | Aislar picos de productores |
| Particiones | Paralelismo controlado |
| Backpressure | Rechazar o degradar productores si buffer lleno |
| Compactación (future) | Snapshots + archivado cold storage |
Seguridad (Validación / Hardening)
- Validar schema de
dataantes de append - Limitar tamaño (ej:
<16KBpor evento) - Sanitizar PII (o tokenizar)
- Controlar multi‑tenant via
metadata.tenantId - Firmar eventos críticos (futuro) / checksum
Testing Estrategias
| Tipo | Objetivo | Técnica |
|---|---|---|
| Unit (handlers) | Lógica determinista | Feed eventos mock y assert estado |
| Store | Concurrencia y versión | Simular conflicto versión |
| Projections | Idempotencia | Reprocesar mismo evento N veces |
| Sagas | Flujo multi-evento | Simular secuencia happy-path y fallos intermedios |
| Replay | Integridad rebuild | Vaciar read model y replay |
Buenas Prácticas
- Tipar eventos (union types) para seguridad de compilación
- Mantener payloads pequeños y semánticos
- Evitar emitir eventos redundantes (ruido)
- Usar correlationId en todos los eventos emitidos
- Documentar contrato de tipos de eventos públicos
Anti‑Patrones
| Situación | Riesgo |
|---|---|
| Eventos mutables | Rompe inmutabilidad y auditoría |
| Monolito de handler gigante | Difícil de mantener / testear |
Eventos genéricos tipo updated | Pobre semántica, difícil auditoría |
| Lógica de negocio en proyección | Complica rebuild y test |
| Reprocesar sin idempotencia | Duplicación de side-effects |
Checklist
- Eventos inmutables & versionados
- Handlers idempotentes
- correlationId y causationId presentes
- Estrategia de particionado definida
- Políticas de retry + DLQ
- Validación de schema aplicada
- Plan de snapshots (si > N eventos/stream)
- Documentación de tipos publicada
Roadmap Futuro
- Snapshots automáticos configurables
- Generador de tipos a partir de esquemas
- Compresión y archivado histórico
- Bus adaptadores (NATS / Kafka)
- Auditoría y métricas (
events_processed_total)
Diseña eventos con semántica clara: facilitan auditoría, evolución y depuración en sistemas distribuidos.