Referencia API
Event API

Event API

Sistema de eventos: local, distribuido, Event Sourcing y CQRS (resumen avanzado).

Índice

  • Conceptos Base
  • Arquitectura
  • Configuración (Memory / Redis / Custom)
  • Modelo de Evento y Metadata
  • Emisión & Suscripción
  • Event Store & Concurrencia Optimista
  • Reconstrucción y Snapshots
  • Projections & Read Models
  • CQRS (Command / Query Flow)
  • Sagas / Process Managers
  • Particionado & Escalado
  • Idempotencia & Replays Seguros
  • Correlation & Causation IDs
  • Dead Letter Queue (DLQ) & Retries
  • Versionado de Eventos y Migraciones
  • Performance & Backpressure
  • Seguridad (Validación / Hardening)
  • Testing Estrategias
  • Buenas Prácticas
  • Anti‑Patrones
  • Checklist
  • Roadmap Futuro

Nota: Si esta página supera límites de tamaño en build (warnings Babel >500KB), considerar segmentar en subpáginas (/event-store, /event-bus, /cqrs, /sagas) para reducir payload.

Conceptos Base

  • Event Store: persiste eventos (append-only)
  • Event Bus: distribución (in-memory / Redis / otros)
  • Handlers: reaccionan a eventos
  • Projections: vistas materializadas
  • Sagas / Process Managers: orquestación de workflows

Arquitectura

Command -> Domain -> Eventos (append) -> Event Store --> (emit local) --> Handlers (Projections/Sagas)
                                                   \-> Event Bus Distribuido -> Otros Servicios
Replay -> reconstruye agregados / reprocesa proyecciones

Componentes:

ComponenteRolPersistencia
EventStoreAppend-only, control versión
EventBusPub/Sub local + adapter distribuidoNo (bus)
ProjectionsVistas materializadasExterna (DB optimizada)
SagasOrquestación multi-eventoPersistente (estado saga)

Configuración

import { EventSystemFactory } from 'fox-framework';
 
// Desarrollo (todo in-memory)
const es = EventSystemFactory.createMemorySystem();
 
// Producción (Redis bus, store memory temporal)
const prod = EventSystemFactory.createFromConfig({
  store: { type:'memory' },
  bus: { adapter:'redis', connection:{ host:'redis', port:6379 }, partitions:4 },
  performance:{ batchSize:50, bufferSize:1000 },
  errorHandling:{ retryFailedMessages:true, maxRetries:3, deadLetterQueue:true }
});

Modelo de Evento y Metadata

interface EventInterface {
  id: string;
  type: string;            // domain.subdomain.action
  aggregateId?: string;
  version?: number;        // control de concurrencia
  data: any;               // payload inmutable
  metadata?: {
    correlationId?: string;
    causationId?: string;
    source?: string;
    tenantId?: string;
    traceId?: string;
  };
  timestamp?: Date;
}

Reglas:

  • type estable (no cambiar significado)
  • data mínima necesaria
  • metadata para trazabilidad & multi‑tenancy

Emisión & Suscripción

es.on('order.paid', async evt => { /* actualizar proyección */ });
await es.emit({ id:uid(), type:'order.paid', aggregateId:'order_1', data:{ total:100 }, metadata:{ correlationId:req.id } });

Wildcard según implementación (order.*).

Event Store & Concurrencia Optimista

Al hacer append se valida versión esperada:

await store.append('order_1', [ { id:'e3', type:'order.shipped', version:3, data:{ tracking:'X' } } ]);

Si versión no coincide -> conflicto (retry después de reconstruir estado).

Reconstrucción y Snapshots

async function loadAggregate(id){
  const history = await store.read(id);
  if (history.length > 200) { /* snapshot heurística */ }
  return replay(history);
}

Snapshot pattern (futuro): guardar cada N eventos resumen de estado para acelerar replay.

Projections & Read Models

Principios:

  • Deterministas
  • Idempotentes
  • Eventual consistency aceptada
es.on('user.registered', async evt => {
  await userReadModel.upsert({ id:evt.aggregateId, email:evt.data.email, createdAt:evt.timestamp });
});

Rebuild: vaciar tabla + replay eventos (offline maintenance).

CQRS (Flujo)

await commandBus.execute(new RegisterUserCommand(...));
const profile = await queryBus.execute(new GetUserProfileQuery(userId));

Comandos generan eventos -> Projections actualizan read models -> Queries consultan read DB.

Sagas / Process Managers

Orquestan flujos multi-agregado.

es.on('order.paid', async evt => {
  await es.emit({ id:uid(), type:'shipment.requested', aggregateId:evt.aggregateId, data:{ address:evt.data.address } });
});

Persistir estado saga (step actual, timeouts) para tolerar reinicios.

Particionado & Escalado

  • Partición por aggregateId hash -> preserva orden por stream
  • Consumidores paralelos por partición
  • Evitar fan-out explosivo (usar eventos derivados agregados)

Idempotencia & Replays Seguros

Handlers deben poder procesar evento duplicado sin efectos secundarios. Patrones:

  • Guardar lastEventVersion procesada por proyección
  • Upserts en vez de inserts ciegos
  • Usar event.id en tabla de deduplicación si side-effect externo (email, pago)

Correlation & Causation IDs

correlationId: cadena de eventos iniciados por una entrada. causationId: evento/comando inmediato que causó el nuevo evento. Útiles para tracing distribuido y debugging.

Dead Letter Queue (DLQ) & Retries

Política:

IntentosAcción
< maxRetriesReintentar exponencial
== maxRetriesMover a DLQ
DLQ procesadoAnalizar, fix, re‑enqueue

Versionado de Eventos y Migraciones

Estrategias:

  1. Evolución compatible (añadir campos opcionales)
  2. Evento nuevo (user.name_corrected) en vez de mutar antiguo
  3. Proyección migradora que re-emite eventos transformados para rebuild Nunca sobrescribir eventos históricos.

Performance & Backpressure

AspectoEstrategia
Lote (batch append)Reducir I/O persistencia
Buffer internoAislar picos de productores
ParticionesParalelismo controlado
BackpressureRechazar o degradar productores si buffer lleno
Compactación (future)Snapshots + archivado cold storage

Seguridad (Validación / Hardening)

  • Validar schema de data antes de append
  • Limitar tamaño (ej: <16KB por evento)
  • Sanitizar PII (o tokenizar)
  • Controlar multi‑tenant via metadata.tenantId
  • Firmar eventos críticos (futuro) / checksum

Testing Estrategias

TipoObjetivoTécnica
Unit (handlers)Lógica deterministaFeed eventos mock y assert estado
StoreConcurrencia y versiónSimular conflicto versión
ProjectionsIdempotenciaReprocesar mismo evento N veces
SagasFlujo multi-eventoSimular secuencia happy-path y fallos intermedios
ReplayIntegridad rebuildVaciar read model y replay

Buenas Prácticas

  • Tipar eventos (union types) para seguridad de compilación
  • Mantener payloads pequeños y semánticos
  • Evitar emitir eventos redundantes (ruido)
  • Usar correlationId en todos los eventos emitidos
  • Documentar contrato de tipos de eventos públicos

Anti‑Patrones

SituaciónRiesgo
Eventos mutablesRompe inmutabilidad y auditoría
Monolito de handler giganteDifícil de mantener / testear
Eventos genéricos tipo updatedPobre semántica, difícil auditoría
Lógica de negocio en proyecciónComplica rebuild y test
Reprocesar sin idempotenciaDuplicación de side-effects

Checklist

  • Eventos inmutables & versionados
  • Handlers idempotentes
  • correlationId y causationId presentes
  • Estrategia de particionado definida
  • Políticas de retry + DLQ
  • Validación de schema aplicada
  • Plan de snapshots (si > N eventos/stream)
  • Documentación de tipos publicada

Roadmap Futuro

  • Snapshots automáticos configurables
  • Generador de tipos a partir de esquemas
  • Compresión y archivado histórico
  • Bus adaptadores (NATS / Kafka)
  • Auditoría y métricas (events_processed_total)

Diseña eventos con semántica clara: facilitan auditoría, evolución y depuración en sistemas distribuidos.