Sistema de Eventos
Este ejemplo muestra cómo implementar un sistema de eventos completo con Event Sourcing, CQRS, mensajería distribuida y integración con diferentes proveedores como Redis, RabbitMQ y Kafka.
Event Store y Event Sourcing
// src/events/event-store.ts
import { EventStore, Event, EventProvider } from '@foxframework/core/events';
export interface DomainEvent {
eventId: string;
aggregateId: string;
aggregateType: string;
eventType: string;
eventVersion: number;
eventData: any;
metadata?: Record<string, any>;
timestamp: Date;
userId?: string;
}
export class FoxEventStore {
private eventStore: EventStore;
constructor(provider: 'memory' | 'redis' | 'postgresql' | 'mongodb' = 'memory') {
this.eventStore = new EventStore({
provider,
connection: this.getConnectionConfig(provider),
options: {
snapshotFrequency: 10,
enableSnapshots: true,
retentionDays: 365
}
});
}
private getConnectionConfig(provider: string) {
switch (provider) {
case 'redis':
return {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
database: process.env.REDIS_EVENTS_DB || '1'
};
case 'postgresql':
return {
host: process.env.DB_HOST || 'localhost',
port: parseInt(process.env.DB_PORT || '5432'),
database: process.env.DB_NAME || 'fox_events',
username: process.env.DB_USER || 'postgres',
password: process.env.DB_PASS || 'password'
};
default:
return {};
}
}
async appendEvent(event: Omit<DomainEvent, 'eventId' | 'timestamp'>): Promise<void> {
const domainEvent: DomainEvent = {
...event,
eventId: `evt-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
timestamp: new Date()
};
await this.eventStore.append(domainEvent.aggregateId, [domainEvent]);
console.log(`📝 Event stored: ${domainEvent.eventType} for ${domainEvent.aggregateType}:${domainEvent.aggregateId}`);
}
async getEvents(aggregateId: string, fromVersion?: number): Promise<DomainEvent[]> {
return this.eventStore.getEvents(aggregateId, fromVersion);
}
async getEventsByType(eventType: string, limit?: number): Promise<DomainEvent[]> {
return this.eventStore.getEventsByType(eventType, limit);
}
async replay(aggregateId: string, toVersion?: number): Promise<DomainEvent[]> {
return this.eventStore.replay(aggregateId, toVersion);
}
async createSnapshot(aggregateId: string, data: any, version: number): Promise<void> {
await this.eventStore.saveSnapshot(aggregateId, data, version);
}
async getSnapshot(aggregateId: string): Promise<any> {
return this.eventStore.getSnapshot(aggregateId);
}
}CQRS Implementation
// src/cqrs/command-handler.ts
export interface Command {
commandId: string;
aggregateId: string;
commandType: string;
payload: any;
metadata?: Record<string, any>;
timestamp: Date;
userId?: string;
}
export interface CommandHandler<T extends Command> {
handle(command: T): Promise<void>;
}
export class CommandBus {
private handlers = new Map<string, CommandHandler<any>>();
register<T extends Command>(commandType: string, handler: CommandHandler<T>): void {
this.handlers.set(commandType, handler);
console.log(`📋 Command handler registered: ${commandType}`);
}
async execute<T extends Command>(command: T): Promise<void> {
const handler = this.handlers.get(command.commandType);
if (!handler) {
throw new Error(`No handler registered for command: ${command.commandType}`);
}
console.log(`⚡ Executing command: ${command.commandType} for ${command.aggregateId}`);
await handler.handle(command);
}
}
// src/cqrs/query-handler.ts
export interface Query {
queryId: string;
queryType: string;
parameters: Record<string, any>;
timestamp: Date;
userId?: string;
}
export interface QueryHandler<T extends Query, R> {
handle(query: T): Promise<R>;
}
export class QueryBus {
private handlers = new Map<string, QueryHandler<any, any>>();
register<T extends Query, R>(queryType: string, handler: QueryHandler<T, R>): void {
this.handlers.set(queryType, handler);
console.log(`🔍 Query handler registered: ${queryType}`);
}
async execute<T extends Query, R>(query: T): Promise<R> {
const handler = this.handlers.get(query.queryType);
if (!handler) {
throw new Error(`No handler registered for query: ${query.queryType}`);
}
console.log(`🔎 Executing query: ${query.queryType}`);
return handler.handle(query);
}
}Domain Aggregates con Event Sourcing
// src/domain/user-aggregate.ts
import { FoxEventStore, DomainEvent } from '../events/event-store';
export interface UserState {
id: string;
email: string;
firstName: string;
lastName: string;
isActive: boolean;
emailVerified: boolean;
createdAt: Date;
version: number;
}
export class UserAggregate {
private state: UserState;
private eventStore: FoxEventStore;
private uncommittedEvents: DomainEvent[] = [];
constructor(eventStore: FoxEventStore, id?: string) {
this.eventStore = eventStore;
this.state = {
id: id || '',
email: '',
firstName: '',
lastName: '',
isActive: false,
emailVerified: false,
createdAt: new Date(),
version: 0
};
}
static async load(eventStore: FoxEventStore, id: string): Promise<UserAggregate> {
const aggregate = new UserAggregate(eventStore, id);
// Intentar cargar desde snapshot
const snapshot = await eventStore.getSnapshot(id);
if (snapshot) {
aggregate.state = { ...snapshot.data };
const events = await eventStore.getEvents(id, snapshot.version + 1);
aggregate.applyEvents(events);
} else {
// Cargar todos los eventos desde el inicio
const events = await eventStore.getEvents(id);
aggregate.applyEvents(events);
}
return aggregate;
}
async createUser(email: string, firstName: string, lastName: string, userId?: string): Promise<void> {
if (this.state.id) {
throw new Error('User already exists');
}
const event: Omit<DomainEvent, 'eventId' | 'timestamp'> = {
aggregateId: this.state.id || `user-${Date.now()}`,
aggregateType: 'User',
eventType: 'UserCreated',
eventVersion: this.state.version + 1,
eventData: {
email,
firstName,
lastName
},
metadata: {
createdBy: userId,
source: 'user-service'
}
};
this.applyEvent(event);
this.uncommittedEvents.push(event as DomainEvent);
}
async verifyEmail(userId?: string): Promise<void> {
if (!this.state.id) {
throw new Error('User does not exist');
}
if (this.state.emailVerified) {
throw new Error('Email already verified');
}
const event: Omit<DomainEvent, 'eventId' | 'timestamp'> = {
aggregateId: this.state.id,
aggregateType: 'User',
eventType: 'EmailVerified',
eventVersion: this.state.version + 1,
eventData: {
verifiedAt: new Date()
},
metadata: {
verifiedBy: userId
}
};
this.applyEvent(event);
this.uncommittedEvents.push(event as DomainEvent);
}
async deactivateUser(reason: string, userId?: string): Promise<void> {
if (!this.state.id) {
throw new Error('User does not exist');
}
if (!this.state.isActive) {
throw new Error('User already deactivated');
}
const event: Omit<DomainEvent, 'eventId' | 'timestamp'> = {
aggregateId: this.state.id,
aggregateType: 'User',
eventType: 'UserDeactivated',
eventVersion: this.state.version + 1,
eventData: {
reason,
deactivatedAt: new Date()
},
metadata: {
deactivatedBy: userId
}
};
this.applyEvent(event);
this.uncommittedEvents.push(event as DomainEvent);
}
async updateProfile(firstName: string, lastName: string, userId?: string): Promise<void> {
if (!this.state.id) {
throw new Error('User does not exist');
}
const event: Omit<DomainEvent, 'eventId' | 'timestamp'> = {
aggregateId: this.state.id,
aggregateType: 'User',
eventType: 'UserProfileUpdated',
eventVersion: this.state.version + 1,
eventData: {
firstName,
lastName,
previousFirstName: this.state.firstName,
previousLastName: this.state.lastName
},
metadata: {
updatedBy: userId
}
};
this.applyEvent(event);
this.uncommittedEvents.push(event as DomainEvent);
}
private applyEvents(events: DomainEvent[]): void {
events.forEach(event => this.applyEvent(event));
}
private applyEvent(event: Omit<DomainEvent, 'eventId' | 'timestamp'> | DomainEvent): void {
switch (event.eventType) {
case 'UserCreated':
this.state.id = event.aggregateId;
this.state.email = event.eventData.email;
this.state.firstName = event.eventData.firstName;
this.state.lastName = event.eventData.lastName;
this.state.isActive = true;
this.state.emailVerified = false;
this.state.createdAt = new Date();
break;
case 'EmailVerified':
this.state.emailVerified = true;
break;
case 'UserDeactivated':
this.state.isActive = false;
break;
case 'UserProfileUpdated':
this.state.firstName = event.eventData.firstName;
this.state.lastName = event.eventData.lastName;
break;
}
this.state.version = event.eventVersion;
}
async saveChanges(): Promise<void> {
for (const event of this.uncommittedEvents) {
await this.eventStore.appendEvent(event);
}
// Crear snapshot cada 10 eventos
if (this.state.version % 10 === 0) {
await this.eventStore.createSnapshot(this.state.id, this.state, this.state.version);
}
this.uncommittedEvents = [];
}
getState(): UserState {
return { ...this.state };
}
getUncommittedEvents(): DomainEvent[] {
return [...this.uncommittedEvents];
}
}Event Bus Distribuido
// src/events/distributed-event-bus.ts
import { EventBus, EventSubscriber } from '@foxframework/core/events';
export interface DistributedEvent {
eventId: string;
eventType: string;
aggregateId: string;
aggregateType: string;
payload: any;
metadata: {
version: number;
timestamp: Date;
correlationId?: string;
causationId?: string;
userId?: string;
source: string;
};
}
export class DistributedEventBus {
private eventBus: EventBus;
private subscribers = new Map<string, EventSubscriber[]>();
constructor(provider: 'memory' | 'redis' | 'rabbitmq' | 'kafka' = 'memory') {
this.eventBus = new EventBus({
provider,
connection: this.getConnectionConfig(provider),
options: {
retryAttempts: 3,
retryDelay: 1000,
deadLetterQueue: true,
enableMetrics: true
}
});
this.setupErrorHandling();
}
private getConnectionConfig(provider: string) {
switch (provider) {
case 'redis':
return {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
database: process.env.REDIS_EVENTS_DB || '2'
};
case 'rabbitmq':
return {
url: process.env.RABBITMQ_URL || 'amqp://localhost:5672',
exchange: 'fox-events',
exchangeType: 'topic'
};
case 'kafka':
return {
brokers: process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
clientId: 'fox-framework',
groupId: 'fox-events'
};
default:
return {};
}
}
async publish(event: DistributedEvent): Promise<void> {
// Agregar metadatos adicionales
const enrichedEvent: DistributedEvent = {
...event,
metadata: {
...event.metadata,
timestamp: new Date(),
correlationId: event.metadata.correlationId || event.eventId,
source: event.metadata.source || 'unknown'
}
};
await this.eventBus.publish(enrichedEvent.eventType, enrichedEvent);
console.log(`📡 Event published: ${enrichedEvent.eventType} (${enrichedEvent.eventId})`);
}
async subscribe(eventType: string, handler: (event: DistributedEvent) => Promise<void>): Promise<void> {
const subscriber: EventSubscriber = {
eventType,
handler: async (data: any) => {
try {
console.log(`📥 Event received: ${eventType} (${data.eventId})`);
await handler(data);
console.log(`✅ Event processed: ${eventType} (${data.eventId})`);
} catch (error) {
console.error(`❌ Event processing failed: ${eventType} (${data.eventId})`, error);
throw error; // Re-throw para que el event bus maneje el retry
}
}
};
if (!this.subscribers.has(eventType)) {
this.subscribers.set(eventType, []);
}
this.subscribers.get(eventType)!.push(subscriber);
await this.eventBus.subscribe(eventType, subscriber.handler);
console.log(`🔔 Subscribed to event: ${eventType}`);
}
async subscribeToPattern(pattern: string, handler: (event: DistributedEvent) => Promise<void>): Promise<void> {
await this.eventBus.subscribeToPattern(pattern, async (data: any) => {
try {
console.log(`📥 Pattern event received: ${pattern} -> ${data.eventType} (${data.eventId})`);
await handler(data);
console.log(`✅ Pattern event processed: ${pattern} -> ${data.eventType} (${data.eventId})`);
} catch (error) {
console.error(`❌ Pattern event processing failed: ${pattern} -> ${data.eventType}`, error);
throw error;
}
});
console.log(`🔔 Subscribed to pattern: ${pattern}`);
}
private setupErrorHandling(): void {
this.eventBus.onError((error: Error, event?: any) => {
console.error('🚨 Event bus error:', error);
if (event) {
console.error('🚨 Failed event:', event);
// Aquí podrías enviar el evento a un dead letter queue
// o a un sistema de monitoreo
}
});
}
async getMetrics(): Promise<any> {
return this.eventBus.getMetrics();
}
async disconnect(): Promise<void> {
await this.eventBus.disconnect();
console.log('🔌 Event bus disconnected');
}
}Projections y Read Models
// src/projections/user-projection.ts
export interface UserProjection {
id: string;
email: string;
fullName: string;
isActive: boolean;
emailVerified: boolean;
createdAt: Date;
lastUpdated: Date;
totalOrders?: number;
totalSpent?: number;
}
export class UserProjectionHandler {
private projections = new Map<string, UserProjection>();
constructor(private eventBus: DistributedEventBus) {
this.setupSubscriptions();
}
private async setupSubscriptions(): Promise<void> {
// Suscribirse a eventos de usuario
await this.eventBus.subscribe('UserCreated', this.handleUserCreated.bind(this));
await this.eventBus.subscribe('EmailVerified', this.handleEmailVerified.bind(this));
await this.eventBus.subscribe('UserDeactivated', this.handleUserDeactivated.bind(this));
await this.eventBus.subscribe('UserProfileUpdated', this.handleUserProfileUpdated.bind(this));
// Suscribirse a eventos de órdenes para actualizar estadísticas
await this.eventBus.subscribe('OrderCompleted', this.handleOrderCompleted.bind(this));
}
private async handleUserCreated(event: DistributedEvent): Promise<void> {
const projection: UserProjection = {
id: event.aggregateId,
email: event.payload.email,
fullName: `${event.payload.firstName} ${event.payload.lastName}`,
isActive: true,
emailVerified: false,
createdAt: event.metadata.timestamp,
lastUpdated: event.metadata.timestamp,
totalOrders: 0,
totalSpent: 0
};
this.projections.set(event.aggregateId, projection);
console.log(`👤 User projection created: ${projection.email}`);
}
private async handleEmailVerified(event: DistributedEvent): Promise<void> {
const projection = this.projections.get(event.aggregateId);
if (projection) {
projection.emailVerified = true;
projection.lastUpdated = event.metadata.timestamp;
console.log(`✅ User projection updated - email verified: ${projection.email}`);
}
}
private async handleUserDeactivated(event: DistributedEvent): Promise<void> {
const projection = this.projections.get(event.aggregateId);
if (projection) {
projection.isActive = false;
projection.lastUpdated = event.metadata.timestamp;
console.log(`🚫 User projection updated - deactivated: ${projection.email}`);
}
}
private async handleUserProfileUpdated(event: DistributedEvent): Promise<void> {
const projection = this.projections.get(event.aggregateId);
if (projection) {
projection.fullName = `${event.payload.firstName} ${event.payload.lastName}`;
projection.lastUpdated = event.metadata.timestamp;
console.log(`📝 User projection updated - profile: ${projection.email}`);
}
}
private async handleOrderCompleted(event: DistributedEvent): Promise<void> {
const projection = this.projections.get(event.payload.userId);
if (projection) {
projection.totalOrders = (projection.totalOrders || 0) + 1;
projection.totalSpent = (projection.totalSpent || 0) + event.payload.total;
projection.lastUpdated = event.metadata.timestamp;
console.log(`💰 User projection updated - order stats: ${projection.email}`);
}
}
getUserProjection(userId: string): UserProjection | null {
return this.projections.get(userId) || null;
}
getAllUserProjections(): UserProjection[] {
return Array.from(this.projections.values());
}
searchUsers(searchTerm: string): UserProjection[] {
const term = searchTerm.toLowerCase();
return Array.from(this.projections.values()).filter(user =>
user.email.toLowerCase().includes(term) ||
user.fullName.toLowerCase().includes(term)
);
}
getUserStatistics(): {
total: number;
active: number;
verified: number;
totalSpent: number;
} {
const users = Array.from(this.projections.values());
return {
total: users.length,
active: users.filter(u => u.isActive).length,
verified: users.filter(u => u.emailVerified).length,
totalSpent: users.reduce((sum, u) => sum + (u.totalSpent || 0), 0)
};
}
}Command Handlers
// src/handlers/user-command-handlers.ts
import { CommandHandler, Command } from '../cqrs/command-handler';
import { UserAggregate } from '../domain/user-aggregate';
import { FoxEventStore } from '../events/event-store';
import { DistributedEventBus } from '../events/distributed-event-bus';
// Commands
export interface CreateUserCommand extends Command {
commandType: 'CreateUser';
payload: {
email: string;
firstName: string;
lastName: string;
};
}
export interface VerifyEmailCommand extends Command {
commandType: 'VerifyEmail';
payload: {
userId: string;
};
}
export interface UpdateUserProfileCommand extends Command {
commandType: 'UpdateUserProfile';
payload: {
userId: string;
firstName: string;
lastName: string;
};
}
export class CreateUserCommandHandler implements CommandHandler<CreateUserCommand> {
constructor(
private eventStore: FoxEventStore,
private eventBus: DistributedEventBus
) {}
async handle(command: CreateUserCommand): Promise<void> {
// Verificar que el email no existe (simplificado para el ejemplo)
const existingEvents = await this.eventStore.getEventsByType('UserCreated');
const emailExists = existingEvents.some(e => e.eventData.email === command.payload.email);
if (emailExists) {
throw new Error(`User with email ${command.payload.email} already exists`);
}
// Crear agregado
const userAggregate = new UserAggregate(this.eventStore);
await userAggregate.createUser(
command.payload.email,
command.payload.firstName,
command.payload.lastName,
command.userId
);
// Guardar eventos
await userAggregate.saveChanges();
// Publicar eventos a otros servicios
const uncommittedEvents = userAggregate.getUncommittedEvents();
for (const event of uncommittedEvents) {
await this.eventBus.publish({
eventId: event.eventId,
eventType: event.eventType,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
payload: event.eventData,
metadata: {
version: event.eventVersion,
timestamp: event.timestamp,
correlationId: command.commandId,
userId: command.userId,
source: 'user-service'
}
});
}
}
}
export class VerifyEmailCommandHandler implements CommandHandler<VerifyEmailCommand> {
constructor(
private eventStore: FoxEventStore,
private eventBus: DistributedEventBus
) {}
async handle(command: VerifyEmailCommand): Promise<void> {
const userAggregate = await UserAggregate.load(this.eventStore, command.payload.userId);
await userAggregate.verifyEmail(command.userId);
await userAggregate.saveChanges();
// Publicar eventos
const uncommittedEvents = userAggregate.getUncommittedEvents();
for (const event of uncommittedEvents) {
await this.eventBus.publish({
eventId: event.eventId,
eventType: event.eventType,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
payload: event.eventData,
metadata: {
version: event.eventVersion,
timestamp: event.timestamp,
correlationId: command.commandId,
userId: command.userId,
source: 'user-service'
}
});
}
}
}Notification Service Integration
// src/services/notification-event-handler.ts
export class NotificationEventHandler {
constructor(private eventBus: DistributedEventBus) {
this.setupSubscriptions();
}
private async setupSubscriptions(): Promise<void> {
// Suscribirse a eventos que requieren notificaciones
await this.eventBus.subscribe('UserCreated', this.handleUserCreated.bind(this));
await this.eventBus.subscribe('EmailVerified', this.handleEmailVerified.bind(this));
await this.eventBus.subscribe('OrderCompleted', this.handleOrderCompleted.bind(this));
await this.eventBus.subscribe('OrderShipped', this.handleOrderShipped.bind(this));
}
private async handleUserCreated(event: DistributedEvent): Promise<void> {
// Enviar email de bienvenida
await this.sendNotification({
type: 'email',
to: event.payload.email,
template: 'welcome',
data: {
firstName: event.payload.firstName,
verificationUrl: `${process.env.FRONTEND_URL}/verify-email?token=...`
},
metadata: {
eventId: event.eventId,
userId: event.aggregateId
}
});
}
private async handleEmailVerified(event: DistributedEvent): Promise<void> {
// Enviar confirmación de email verificado
await this.sendNotification({
type: 'email',
to: event.payload.email,
template: 'email-verified',
data: {
dashboardUrl: `${process.env.FRONTEND_URL}/dashboard`
},
metadata: {
eventId: event.eventId,
userId: event.aggregateId
}
});
}
private async handleOrderCompleted(event: DistributedEvent): Promise<void> {
// Enviar confirmación de pedido
await this.sendNotification({
type: 'email',
to: event.payload.customerEmail,
template: 'order-confirmation',
data: {
orderNumber: event.payload.orderNumber,
total: event.payload.total,
items: event.payload.items,
trackingUrl: `${process.env.FRONTEND_URL}/orders/${event.payload.orderNumber}/track`
},
metadata: {
eventId: event.eventId,
orderId: event.aggregateId,
userId: event.payload.userId
}
});
}
private async handleOrderShipped(event: DistributedEvent): Promise<void> {
// Enviar notificación de envío
await this.sendNotification({
type: 'email',
to: event.payload.customerEmail,
template: 'order-shipped',
data: {
orderNumber: event.payload.orderNumber,
trackingNumber: event.payload.trackingNumber,
estimatedDelivery: event.payload.estimatedDelivery,
trackingUrl: event.payload.trackingUrl
},
metadata: {
eventId: event.eventId,
orderId: event.aggregateId,
userId: event.payload.userId
}
});
// También enviar notificación push si el usuario tiene la app
await this.sendNotification({
type: 'push',
to: event.payload.userId,
template: 'order-shipped-push',
data: {
title: 'Order Shipped!',
body: `Your order ${event.payload.orderNumber} has been shipped`,
orderNumber: event.payload.orderNumber
},
metadata: {
eventId: event.eventId,
orderId: event.aggregateId
}
});
}
private async sendNotification(notification: any): Promise<void> {
// Publicar evento de notificación para que el servicio de notificaciones lo procese
await this.eventBus.publish({
eventId: `notif-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
eventType: 'NotificationRequested',
aggregateId: notification.metadata.userId || notification.metadata.orderId,
aggregateType: 'Notification',
payload: notification,
metadata: {
version: 1,
timestamp: new Date(),
source: 'notification-handler'
}
});
}
}Servidor Principal con Eventos
// src/server.ts
import { FoxFactory, Request, Response } from '@foxframework/core';
import { FoxEventStore } from './events/event-store';
import { DistributedEventBus } from './events/distributed-event-bus';
import { CommandBus } from './cqrs/command-handler';
import { QueryBus } from './cqrs/query-handler';
import { UserProjectionHandler } from './projections/user-projection';
import { CreateUserCommandHandler, VerifyEmailCommandHandler } from './handlers/user-command-handlers';
import { NotificationEventHandler } from './services/notification-event-handler';
class EventDrivenServer {
private eventStore: FoxEventStore;
private eventBus: DistributedEventBus;
private commandBus: CommandBus;
private queryBus: QueryBus;
private userProjectionHandler: UserProjectionHandler;
private notificationHandler: NotificationEventHandler;
constructor() {
// Inicializar sistema de eventos
this.eventStore = new FoxEventStore(process.env.EVENT_STORE_PROVIDER as any || 'memory');
this.eventBus = new DistributedEventBus(process.env.EVENT_BUS_PROVIDER as any || 'memory');
// Inicializar CQRS
this.commandBus = new CommandBus();
this.queryBus = new QueryBus();
// Inicializar projections y handlers
this.userProjectionHandler = new UserProjectionHandler(this.eventBus);
this.notificationHandler = new NotificationEventHandler(this.eventBus);
this.setupCommandHandlers();
}
private setupCommandHandlers(): void {
// Registrar command handlers
this.commandBus.register('CreateUser', new CreateUserCommandHandler(this.eventStore, this.eventBus));
this.commandBus.register('VerifyEmail', new VerifyEmailCommandHandler(this.eventStore, this.eventBus));
}
// POST /users
createUser = async (req: Request, res: Response): Promise<void> => {
try {
const { email, firstName, lastName } = req.body;
const command = {
commandId: `cmd-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
aggregateId: `user-${Date.now()}`,
commandType: 'CreateUser' as const,
payload: { email, firstName, lastName },
timestamp: new Date(),
userId: req.user?.id
};
await this.commandBus.execute(command);
res.status(202).json({
message: 'User creation initiated',
commandId: command.commandId
});
} catch (error) {
res.status(400).json({
error: 'Command failed',
message: error instanceof Error ? error.message : 'Unknown error'
});
}
};
// POST /users/:id/verify-email
verifyEmail = async (req: Request, res: Response): Promise<void> => {
try {
const userId = req.params.id;
const command = {
commandId: `cmd-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
aggregateId: userId,
commandType: 'VerifyEmail' as const,
payload: { userId },
timestamp: new Date(),
userId: req.user?.id
};
await this.commandBus.execute(command);
res.json({
message: 'Email verification initiated',
commandId: command.commandId
});
} catch (error) {
res.status(400).json({
error: 'Command failed',
message: error instanceof Error ? error.message : 'Unknown error'
});
}
};
// GET /users
getUsers = async (req: Request, res: Response): Promise<void> => {
const users = this.userProjectionHandler.getAllUserProjections();
res.json({ data: users });
};
// GET /users/:id
getUser = async (req: Request, res: Response): Promise<void> => {
const userId = req.params.id;
const user = this.userProjectionHandler.getUserProjection(userId);
if (!user) {
return res.status(404).json({
error: 'User not found',
message: `User with ID ${userId} not found`
});
}
res.json({ data: user });
};
// GET /users/search
searchUsers = async (req: Request, res: Response): Promise<void> => {
const { q } = req.query;
if (!q) {
return res.status(400).json({
error: 'Search term required',
message: 'Query parameter "q" is required'
});
}
const users = this.userProjectionHandler.searchUsers(q as string);
res.json({ data: users });
};
// GET /analytics/users
getUserAnalytics = async (req: Request, res: Response): Promise<void> => {
const stats = this.userProjectionHandler.getUserStatistics();
res.json({ data: stats });
};
// GET /events/metrics
getEventMetrics = async (req: Request, res: Response): Promise<void> => {
const metrics = await this.eventBus.getMetrics();
res.json({ data: metrics });
};
createServer() {
return FoxFactory.createServer({
port: 3000,
routes: [
// Command endpoints (writes)
{ path: '/users', method: 'post', handler: this.createUser },
{ path: '/users/:id/verify-email', method: 'post', handler: this.verifyEmail },
// Query endpoints (reads)
{ path: '/users', method: 'get', handler: this.getUsers },
{ path: '/users/:id', method: 'get', handler: this.getUser },
{ path: '/users/search', method: 'get', handler: this.searchUsers },
{ path: '/analytics/users', method: 'get', handler: this.getUserAnalytics },
{ path: '/events/metrics', method: 'get', handler: this.getEventMetrics },
// Health check
{
path: '/health',
method: 'get',
handler: (req: Request, res: Response) => {
res.json({
status: 'healthy',
timestamp: new Date().toISOString(),
eventStore: 'connected',
eventBus: 'connected'
});
}
}
]
});
}
async shutdown(): Promise<void> {
await this.eventBus.disconnect();
console.log('🔌 Event-driven server shutdown complete');
}
}
// Inicializar y arrancar servidor
const server = new EventDrivenServer();
const app = server.createServer();
app.start().then(() => {
console.log('🦊 Event-driven server running on http://localhost:3000');
console.log('📡 Event system initialized');
console.log('📚 Available endpoints:');
console.log(' POST /users - Create user (command)');
console.log(' POST /users/:id/verify-email - Verify email (command)');
console.log(' GET /users - List users (query)');
console.log(' GET /users/:id - Get user (query)');
console.log(' GET /users/search - Search users (query)');
console.log(' GET /analytics/users - User analytics (query)');
console.log(' GET /events/metrics - Event metrics (query)');
});
// Graceful shutdown
process.on('SIGTERM', () => server.shutdown());
process.on('SIGINT', () => server.shutdown());Variables de Entorno
# .env
# Event Store Configuration
EVENT_STORE_PROVIDER=redis # memory, redis, postgresql, mongodb
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_EVENTS_DB=1
# Event Bus Configuration
EVENT_BUS_PROVIDER=redis # memory, redis, rabbitmq, kafka
REDIS_EVENTS_DB=2
# RabbitMQ (opcional)
RABBITMQ_URL=amqp://localhost:5672
# Kafka (opcional)
KAFKA_BROKERS=localhost:9092
# Application
NODE_ENV=development
FRONTEND_URL=http://localhost:3001Características Destacadas
- Event Sourcing: Almacenamiento completo del historial de eventos
- CQRS: Separación de comandos y consultas para escalabilidad
- Event Bus Distribuido: Mensajería asíncrona entre servicios
- Projections: Vistas materializadas para consultas optimizadas
- Multi-Provider Support: Memory, Redis, RabbitMQ, Kafka
- Snapshots: Optimización de carga de agregados
- Circuit Breakers: Resistencia a fallos en evento distribuidos
- Correlation IDs: Trazabilidad de eventos relacionados
- Dead Letter Queues: Manejo de eventos fallidos
- Event Replay: Capacidad de reproducir eventos históricos