Ejemplos
Sistema de Eventos

Sistema de Eventos

Este ejemplo muestra cómo implementar un sistema de eventos completo con Event Sourcing, CQRS, mensajería distribuida y integración con diferentes proveedores como Redis, RabbitMQ y Kafka.

Event Store y Event Sourcing

// src/events/event-store.ts
import { EventStore, Event, EventProvider } from '@foxframework/core/events';
 
export interface DomainEvent {
  eventId: string;
  aggregateId: string;
  aggregateType: string;
  eventType: string;
  eventVersion: number;
  eventData: any;
  metadata?: Record<string, any>;
  timestamp: Date;
  userId?: string;
}
 
export class FoxEventStore {
  private eventStore: EventStore;
 
  constructor(provider: 'memory' | 'redis' | 'postgresql' | 'mongodb' = 'memory') {
    this.eventStore = new EventStore({
      provider,
      connection: this.getConnectionConfig(provider),
      options: {
        snapshotFrequency: 10,
        enableSnapshots: true,
        retentionDays: 365
      }
    });
  }
 
  private getConnectionConfig(provider: string) {
    switch (provider) {
      case 'redis':
        return {
          host: process.env.REDIS_HOST || 'localhost',
          port: parseInt(process.env.REDIS_PORT || '6379'),
          database: process.env.REDIS_EVENTS_DB || '1'
        };
      case 'postgresql':
        return {
          host: process.env.DB_HOST || 'localhost',
          port: parseInt(process.env.DB_PORT || '5432'),
          database: process.env.DB_NAME || 'fox_events',
          username: process.env.DB_USER || 'postgres',
          password: process.env.DB_PASS || 'password'
        };
      default:
        return {};
    }
  }
 
  async appendEvent(event: Omit<DomainEvent, 'eventId' | 'timestamp'>): Promise<void> {
    const domainEvent: DomainEvent = {
      ...event,
      eventId: `evt-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
      timestamp: new Date()
    };
 
    await this.eventStore.append(domainEvent.aggregateId, [domainEvent]);
    console.log(`📝 Event stored: ${domainEvent.eventType} for ${domainEvent.aggregateType}:${domainEvent.aggregateId}`);
  }
 
  async getEvents(aggregateId: string, fromVersion?: number): Promise<DomainEvent[]> {
    return this.eventStore.getEvents(aggregateId, fromVersion);
  }
 
  async getEventsByType(eventType: string, limit?: number): Promise<DomainEvent[]> {
    return this.eventStore.getEventsByType(eventType, limit);
  }
 
  async replay(aggregateId: string, toVersion?: number): Promise<DomainEvent[]> {
    return this.eventStore.replay(aggregateId, toVersion);
  }
 
  async createSnapshot(aggregateId: string, data: any, version: number): Promise<void> {
    await this.eventStore.saveSnapshot(aggregateId, data, version);
  }
 
  async getSnapshot(aggregateId: string): Promise<any> {
    return this.eventStore.getSnapshot(aggregateId);
  }
}

CQRS Implementation

// src/cqrs/command-handler.ts
export interface Command {
  commandId: string;
  aggregateId: string;
  commandType: string;
  payload: any;
  metadata?: Record<string, any>;
  timestamp: Date;
  userId?: string;
}
 
export interface CommandHandler<T extends Command> {
  handle(command: T): Promise<void>;
}
 
export class CommandBus {
  private handlers = new Map<string, CommandHandler<any>>();
 
  register<T extends Command>(commandType: string, handler: CommandHandler<T>): void {
    this.handlers.set(commandType, handler);
    console.log(`📋 Command handler registered: ${commandType}`);
  }
 
  async execute<T extends Command>(command: T): Promise<void> {
    const handler = this.handlers.get(command.commandType);
    if (!handler) {
      throw new Error(`No handler registered for command: ${command.commandType}`);
    }
 
    console.log(`⚡ Executing command: ${command.commandType} for ${command.aggregateId}`);
    await handler.handle(command);
  }
}
 
// src/cqrs/query-handler.ts
export interface Query {
  queryId: string;
  queryType: string;
  parameters: Record<string, any>;
  timestamp: Date;
  userId?: string;
}
 
export interface QueryHandler<T extends Query, R> {
  handle(query: T): Promise<R>;
}
 
export class QueryBus {
  private handlers = new Map<string, QueryHandler<any, any>>();
 
  register<T extends Query, R>(queryType: string, handler: QueryHandler<T, R>): void {
    this.handlers.set(queryType, handler);
    console.log(`🔍 Query handler registered: ${queryType}`);
  }
 
  async execute<T extends Query, R>(query: T): Promise<R> {
    const handler = this.handlers.get(query.queryType);
    if (!handler) {
      throw new Error(`No handler registered for query: ${query.queryType}`);
    }
 
    console.log(`🔎 Executing query: ${query.queryType}`);
    return handler.handle(query);
  }
}

Domain Aggregates con Event Sourcing

// src/domain/user-aggregate.ts
import { FoxEventStore, DomainEvent } from '../events/event-store';
 
export interface UserState {
  id: string;
  email: string;
  firstName: string;
  lastName: string;
  isActive: boolean;
  emailVerified: boolean;
  createdAt: Date;
  version: number;
}
 
export class UserAggregate {
  private state: UserState;
  private eventStore: FoxEventStore;
  private uncommittedEvents: DomainEvent[] = [];
 
  constructor(eventStore: FoxEventStore, id?: string) {
    this.eventStore = eventStore;
    this.state = {
      id: id || '',
      email: '',
      firstName: '',
      lastName: '',
      isActive: false,
      emailVerified: false,
      createdAt: new Date(),
      version: 0
    };
  }
 
  static async load(eventStore: FoxEventStore, id: string): Promise<UserAggregate> {
    const aggregate = new UserAggregate(eventStore, id);
    
    // Intentar cargar desde snapshot
    const snapshot = await eventStore.getSnapshot(id);
    if (snapshot) {
      aggregate.state = { ...snapshot.data };
      const events = await eventStore.getEvents(id, snapshot.version + 1);
      aggregate.applyEvents(events);
    } else {
      // Cargar todos los eventos desde el inicio
      const events = await eventStore.getEvents(id);
      aggregate.applyEvents(events);
    }
 
    return aggregate;
  }
 
  async createUser(email: string, firstName: string, lastName: string, userId?: string): Promise<void> {
    if (this.state.id) {
      throw new Error('User already exists');
    }
 
    const event: Omit<DomainEvent, 'eventId' | 'timestamp'> = {
      aggregateId: this.state.id || `user-${Date.now()}`,
      aggregateType: 'User',
      eventType: 'UserCreated',
      eventVersion: this.state.version + 1,
      eventData: {
        email,
        firstName,
        lastName
      },
      metadata: {
        createdBy: userId,
        source: 'user-service'
      }
    };
 
    this.applyEvent(event);
    this.uncommittedEvents.push(event as DomainEvent);
  }
 
  async verifyEmail(userId?: string): Promise<void> {
    if (!this.state.id) {
      throw new Error('User does not exist');
    }
 
    if (this.state.emailVerified) {
      throw new Error('Email already verified');
    }
 
    const event: Omit<DomainEvent, 'eventId' | 'timestamp'> = {
      aggregateId: this.state.id,
      aggregateType: 'User',
      eventType: 'EmailVerified',
      eventVersion: this.state.version + 1,
      eventData: {
        verifiedAt: new Date()
      },
      metadata: {
        verifiedBy: userId
      }
    };
 
    this.applyEvent(event);
    this.uncommittedEvents.push(event as DomainEvent);
  }
 
  async deactivateUser(reason: string, userId?: string): Promise<void> {
    if (!this.state.id) {
      throw new Error('User does not exist');
    }
 
    if (!this.state.isActive) {
      throw new Error('User already deactivated');
    }
 
    const event: Omit<DomainEvent, 'eventId' | 'timestamp'> = {
      aggregateId: this.state.id,
      aggregateType: 'User',
      eventType: 'UserDeactivated',
      eventVersion: this.state.version + 1,
      eventData: {
        reason,
        deactivatedAt: new Date()
      },
      metadata: {
        deactivatedBy: userId
      }
    };
 
    this.applyEvent(event);
    this.uncommittedEvents.push(event as DomainEvent);
  }
 
  async updateProfile(firstName: string, lastName: string, userId?: string): Promise<void> {
    if (!this.state.id) {
      throw new Error('User does not exist');
    }
 
    const event: Omit<DomainEvent, 'eventId' | 'timestamp'> = {
      aggregateId: this.state.id,
      aggregateType: 'User',
      eventType: 'UserProfileUpdated',
      eventVersion: this.state.version + 1,
      eventData: {
        firstName,
        lastName,
        previousFirstName: this.state.firstName,
        previousLastName: this.state.lastName
      },
      metadata: {
        updatedBy: userId
      }
    };
 
    this.applyEvent(event);
    this.uncommittedEvents.push(event as DomainEvent);
  }
 
  private applyEvents(events: DomainEvent[]): void {
    events.forEach(event => this.applyEvent(event));
  }
 
  private applyEvent(event: Omit<DomainEvent, 'eventId' | 'timestamp'> | DomainEvent): void {
    switch (event.eventType) {
      case 'UserCreated':
        this.state.id = event.aggregateId;
        this.state.email = event.eventData.email;
        this.state.firstName = event.eventData.firstName;
        this.state.lastName = event.eventData.lastName;
        this.state.isActive = true;
        this.state.emailVerified = false;
        this.state.createdAt = new Date();
        break;
 
      case 'EmailVerified':
        this.state.emailVerified = true;
        break;
 
      case 'UserDeactivated':
        this.state.isActive = false;
        break;
 
      case 'UserProfileUpdated':
        this.state.firstName = event.eventData.firstName;
        this.state.lastName = event.eventData.lastName;
        break;
    }
 
    this.state.version = event.eventVersion;
  }
 
  async saveChanges(): Promise<void> {
    for (const event of this.uncommittedEvents) {
      await this.eventStore.appendEvent(event);
    }
 
    // Crear snapshot cada 10 eventos
    if (this.state.version % 10 === 0) {
      await this.eventStore.createSnapshot(this.state.id, this.state, this.state.version);
    }
 
    this.uncommittedEvents = [];
  }
 
  getState(): UserState {
    return { ...this.state };
  }
 
  getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents];
  }
}

Event Bus Distribuido

// src/events/distributed-event-bus.ts
import { EventBus, EventSubscriber } from '@foxframework/core/events';
 
export interface DistributedEvent {
  eventId: string;
  eventType: string;
  aggregateId: string;
  aggregateType: string;
  payload: any;
  metadata: {
    version: number;
    timestamp: Date;
    correlationId?: string;
    causationId?: string;
    userId?: string;
    source: string;
  };
}
 
export class DistributedEventBus {
  private eventBus: EventBus;
  private subscribers = new Map<string, EventSubscriber[]>();
 
  constructor(provider: 'memory' | 'redis' | 'rabbitmq' | 'kafka' = 'memory') {
    this.eventBus = new EventBus({
      provider,
      connection: this.getConnectionConfig(provider),
      options: {
        retryAttempts: 3,
        retryDelay: 1000,
        deadLetterQueue: true,
        enableMetrics: true
      }
    });
 
    this.setupErrorHandling();
  }
 
  private getConnectionConfig(provider: string) {
    switch (provider) {
      case 'redis':
        return {
          host: process.env.REDIS_HOST || 'localhost',
          port: parseInt(process.env.REDIS_PORT || '6379'),
          database: process.env.REDIS_EVENTS_DB || '2'
        };
      case 'rabbitmq':
        return {
          url: process.env.RABBITMQ_URL || 'amqp://localhost:5672',
          exchange: 'fox-events',
          exchangeType: 'topic'
        };
      case 'kafka':
        return {
          brokers: process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
          clientId: 'fox-framework',
          groupId: 'fox-events'
        };
      default:
        return {};
    }
  }
 
  async publish(event: DistributedEvent): Promise<void> {
    // Agregar metadatos adicionales
    const enrichedEvent: DistributedEvent = {
      ...event,
      metadata: {
        ...event.metadata,
        timestamp: new Date(),
        correlationId: event.metadata.correlationId || event.eventId,
        source: event.metadata.source || 'unknown'
      }
    };
 
    await this.eventBus.publish(enrichedEvent.eventType, enrichedEvent);
    console.log(`📡 Event published: ${enrichedEvent.eventType} (${enrichedEvent.eventId})`);
  }
 
  async subscribe(eventType: string, handler: (event: DistributedEvent) => Promise<void>): Promise<void> {
    const subscriber: EventSubscriber = {
      eventType,
      handler: async (data: any) => {
        try {
          console.log(`📥 Event received: ${eventType} (${data.eventId})`);
          await handler(data);
          console.log(`✅ Event processed: ${eventType} (${data.eventId})`);
        } catch (error) {
          console.error(`❌ Event processing failed: ${eventType} (${data.eventId})`, error);
          throw error; // Re-throw para que el event bus maneje el retry
        }
      }
    };
 
    if (!this.subscribers.has(eventType)) {
      this.subscribers.set(eventType, []);
    }
    this.subscribers.get(eventType)!.push(subscriber);
 
    await this.eventBus.subscribe(eventType, subscriber.handler);
    console.log(`🔔 Subscribed to event: ${eventType}`);
  }
 
  async subscribeToPattern(pattern: string, handler: (event: DistributedEvent) => Promise<void>): Promise<void> {
    await this.eventBus.subscribeToPattern(pattern, async (data: any) => {
      try {
        console.log(`📥 Pattern event received: ${pattern} -> ${data.eventType} (${data.eventId})`);
        await handler(data);
        console.log(`✅ Pattern event processed: ${pattern} -> ${data.eventType} (${data.eventId})`);
      } catch (error) {
        console.error(`❌ Pattern event processing failed: ${pattern} -> ${data.eventType}`, error);
        throw error;
      }
    });
 
    console.log(`🔔 Subscribed to pattern: ${pattern}`);
  }
 
  private setupErrorHandling(): void {
    this.eventBus.onError((error: Error, event?: any) => {
      console.error('🚨 Event bus error:', error);
      if (event) {
        console.error('🚨 Failed event:', event);
        // Aquí podrías enviar el evento a un dead letter queue
        // o a un sistema de monitoreo
      }
    });
  }
 
  async getMetrics(): Promise<any> {
    return this.eventBus.getMetrics();
  }
 
  async disconnect(): Promise<void> {
    await this.eventBus.disconnect();
    console.log('🔌 Event bus disconnected');
  }
}

Projections y Read Models

// src/projections/user-projection.ts
export interface UserProjection {
  id: string;
  email: string;
  fullName: string;
  isActive: boolean;
  emailVerified: boolean;
  createdAt: Date;
  lastUpdated: Date;
  totalOrders?: number;
  totalSpent?: number;
}
 
export class UserProjectionHandler {
  private projections = new Map<string, UserProjection>();
 
  constructor(private eventBus: DistributedEventBus) {
    this.setupSubscriptions();
  }
 
  private async setupSubscriptions(): Promise<void> {
    // Suscribirse a eventos de usuario
    await this.eventBus.subscribe('UserCreated', this.handleUserCreated.bind(this));
    await this.eventBus.subscribe('EmailVerified', this.handleEmailVerified.bind(this));
    await this.eventBus.subscribe('UserDeactivated', this.handleUserDeactivated.bind(this));
    await this.eventBus.subscribe('UserProfileUpdated', this.handleUserProfileUpdated.bind(this));
    
    // Suscribirse a eventos de órdenes para actualizar estadísticas
    await this.eventBus.subscribe('OrderCompleted', this.handleOrderCompleted.bind(this));
  }
 
  private async handleUserCreated(event: DistributedEvent): Promise<void> {
    const projection: UserProjection = {
      id: event.aggregateId,
      email: event.payload.email,
      fullName: `${event.payload.firstName} ${event.payload.lastName}`,
      isActive: true,
      emailVerified: false,
      createdAt: event.metadata.timestamp,
      lastUpdated: event.metadata.timestamp,
      totalOrders: 0,
      totalSpent: 0
    };
 
    this.projections.set(event.aggregateId, projection);
    console.log(`👤 User projection created: ${projection.email}`);
  }
 
  private async handleEmailVerified(event: DistributedEvent): Promise<void> {
    const projection = this.projections.get(event.aggregateId);
    if (projection) {
      projection.emailVerified = true;
      projection.lastUpdated = event.metadata.timestamp;
      console.log(`✅ User projection updated - email verified: ${projection.email}`);
    }
  }
 
  private async handleUserDeactivated(event: DistributedEvent): Promise<void> {
    const projection = this.projections.get(event.aggregateId);
    if (projection) {
      projection.isActive = false;
      projection.lastUpdated = event.metadata.timestamp;
      console.log(`🚫 User projection updated - deactivated: ${projection.email}`);
    }
  }
 
  private async handleUserProfileUpdated(event: DistributedEvent): Promise<void> {
    const projection = this.projections.get(event.aggregateId);
    if (projection) {
      projection.fullName = `${event.payload.firstName} ${event.payload.lastName}`;
      projection.lastUpdated = event.metadata.timestamp;
      console.log(`📝 User projection updated - profile: ${projection.email}`);
    }
  }
 
  private async handleOrderCompleted(event: DistributedEvent): Promise<void> {
    const projection = this.projections.get(event.payload.userId);
    if (projection) {
      projection.totalOrders = (projection.totalOrders || 0) + 1;
      projection.totalSpent = (projection.totalSpent || 0) + event.payload.total;
      projection.lastUpdated = event.metadata.timestamp;
      console.log(`💰 User projection updated - order stats: ${projection.email}`);
    }
  }
 
  getUserProjection(userId: string): UserProjection | null {
    return this.projections.get(userId) || null;
  }
 
  getAllUserProjections(): UserProjection[] {
    return Array.from(this.projections.values());
  }
 
  searchUsers(searchTerm: string): UserProjection[] {
    const term = searchTerm.toLowerCase();
    return Array.from(this.projections.values()).filter(user =>
      user.email.toLowerCase().includes(term) ||
      user.fullName.toLowerCase().includes(term)
    );
  }
 
  getUserStatistics(): {
    total: number;
    active: number;
    verified: number;
    totalSpent: number;
  } {
    const users = Array.from(this.projections.values());
    return {
      total: users.length,
      active: users.filter(u => u.isActive).length,
      verified: users.filter(u => u.emailVerified).length,
      totalSpent: users.reduce((sum, u) => sum + (u.totalSpent || 0), 0)
    };
  }
}

Command Handlers

// src/handlers/user-command-handlers.ts
import { CommandHandler, Command } from '../cqrs/command-handler';
import { UserAggregate } from '../domain/user-aggregate';
import { FoxEventStore } from '../events/event-store';
import { DistributedEventBus } from '../events/distributed-event-bus';
 
// Commands
export interface CreateUserCommand extends Command {
  commandType: 'CreateUser';
  payload: {
    email: string;
    firstName: string;
    lastName: string;
  };
}
 
export interface VerifyEmailCommand extends Command {
  commandType: 'VerifyEmail';
  payload: {
    userId: string;
  };
}
 
export interface UpdateUserProfileCommand extends Command {
  commandType: 'UpdateUserProfile';
  payload: {
    userId: string;
    firstName: string;
    lastName: string;
  };
}
 
export class CreateUserCommandHandler implements CommandHandler<CreateUserCommand> {
  constructor(
    private eventStore: FoxEventStore,
    private eventBus: DistributedEventBus
  ) {}
 
  async handle(command: CreateUserCommand): Promise<void> {
    // Verificar que el email no existe (simplificado para el ejemplo)
    const existingEvents = await this.eventStore.getEventsByType('UserCreated');
    const emailExists = existingEvents.some(e => e.eventData.email === command.payload.email);
    
    if (emailExists) {
      throw new Error(`User with email ${command.payload.email} already exists`);
    }
 
    // Crear agregado
    const userAggregate = new UserAggregate(this.eventStore);
    await userAggregate.createUser(
      command.payload.email,
      command.payload.firstName,
      command.payload.lastName,
      command.userId
    );
 
    // Guardar eventos
    await userAggregate.saveChanges();
 
    // Publicar eventos a otros servicios
    const uncommittedEvents = userAggregate.getUncommittedEvents();
    for (const event of uncommittedEvents) {
      await this.eventBus.publish({
        eventId: event.eventId,
        eventType: event.eventType,
        aggregateId: event.aggregateId,
        aggregateType: event.aggregateType,
        payload: event.eventData,
        metadata: {
          version: event.eventVersion,
          timestamp: event.timestamp,
          correlationId: command.commandId,
          userId: command.userId,
          source: 'user-service'
        }
      });
    }
  }
}
 
export class VerifyEmailCommandHandler implements CommandHandler<VerifyEmailCommand> {
  constructor(
    private eventStore: FoxEventStore,
    private eventBus: DistributedEventBus
  ) {}
 
  async handle(command: VerifyEmailCommand): Promise<void> {
    const userAggregate = await UserAggregate.load(this.eventStore, command.payload.userId);
    await userAggregate.verifyEmail(command.userId);
    await userAggregate.saveChanges();
 
    // Publicar eventos
    const uncommittedEvents = userAggregate.getUncommittedEvents();
    for (const event of uncommittedEvents) {
      await this.eventBus.publish({
        eventId: event.eventId,
        eventType: event.eventType,
        aggregateId: event.aggregateId,
        aggregateType: event.aggregateType,
        payload: event.eventData,
        metadata: {
          version: event.eventVersion,
          timestamp: event.timestamp,
          correlationId: command.commandId,
          userId: command.userId,
          source: 'user-service'
        }
      });
    }
  }
}

Notification Service Integration

// src/services/notification-event-handler.ts
export class NotificationEventHandler {
  constructor(private eventBus: DistributedEventBus) {
    this.setupSubscriptions();
  }
 
  private async setupSubscriptions(): Promise<void> {
    // Suscribirse a eventos que requieren notificaciones
    await this.eventBus.subscribe('UserCreated', this.handleUserCreated.bind(this));
    await this.eventBus.subscribe('EmailVerified', this.handleEmailVerified.bind(this));
    await this.eventBus.subscribe('OrderCompleted', this.handleOrderCompleted.bind(this));
    await this.eventBus.subscribe('OrderShipped', this.handleOrderShipped.bind(this));
  }
 
  private async handleUserCreated(event: DistributedEvent): Promise<void> {
    // Enviar email de bienvenida
    await this.sendNotification({
      type: 'email',
      to: event.payload.email,
      template: 'welcome',
      data: {
        firstName: event.payload.firstName,
        verificationUrl: `${process.env.FRONTEND_URL}/verify-email?token=...`
      },
      metadata: {
        eventId: event.eventId,
        userId: event.aggregateId
      }
    });
  }
 
  private async handleEmailVerified(event: DistributedEvent): Promise<void> {
    // Enviar confirmación de email verificado
    await this.sendNotification({
      type: 'email',
      to: event.payload.email,
      template: 'email-verified',
      data: {
        dashboardUrl: `${process.env.FRONTEND_URL}/dashboard`
      },
      metadata: {
        eventId: event.eventId,
        userId: event.aggregateId
      }
    });
  }
 
  private async handleOrderCompleted(event: DistributedEvent): Promise<void> {
    // Enviar confirmación de pedido
    await this.sendNotification({
      type: 'email',
      to: event.payload.customerEmail,
      template: 'order-confirmation',
      data: {
        orderNumber: event.payload.orderNumber,
        total: event.payload.total,
        items: event.payload.items,
        trackingUrl: `${process.env.FRONTEND_URL}/orders/${event.payload.orderNumber}/track`
      },
      metadata: {
        eventId: event.eventId,
        orderId: event.aggregateId,
        userId: event.payload.userId
      }
    });
  }
 
  private async handleOrderShipped(event: DistributedEvent): Promise<void> {
    // Enviar notificación de envío
    await this.sendNotification({
      type: 'email',
      to: event.payload.customerEmail,
      template: 'order-shipped',
      data: {
        orderNumber: event.payload.orderNumber,
        trackingNumber: event.payload.trackingNumber,
        estimatedDelivery: event.payload.estimatedDelivery,
        trackingUrl: event.payload.trackingUrl
      },
      metadata: {
        eventId: event.eventId,
        orderId: event.aggregateId,
        userId: event.payload.userId
      }
    });
 
    // También enviar notificación push si el usuario tiene la app
    await this.sendNotification({
      type: 'push',
      to: event.payload.userId,
      template: 'order-shipped-push',
      data: {
        title: 'Order Shipped!',
        body: `Your order ${event.payload.orderNumber} has been shipped`,
        orderNumber: event.payload.orderNumber
      },
      metadata: {
        eventId: event.eventId,
        orderId: event.aggregateId
      }
    });
  }
 
  private async sendNotification(notification: any): Promise<void> {
    // Publicar evento de notificación para que el servicio de notificaciones lo procese
    await this.eventBus.publish({
      eventId: `notif-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
      eventType: 'NotificationRequested',
      aggregateId: notification.metadata.userId || notification.metadata.orderId,
      aggregateType: 'Notification',
      payload: notification,
      metadata: {
        version: 1,
        timestamp: new Date(),
        source: 'notification-handler'
      }
    });
  }
}

Servidor Principal con Eventos

// src/server.ts
import { FoxFactory, Request, Response } from '@foxframework/core';
import { FoxEventStore } from './events/event-store';
import { DistributedEventBus } from './events/distributed-event-bus';
import { CommandBus } from './cqrs/command-handler';
import { QueryBus } from './cqrs/query-handler';
import { UserProjectionHandler } from './projections/user-projection';
import { CreateUserCommandHandler, VerifyEmailCommandHandler } from './handlers/user-command-handlers';
import { NotificationEventHandler } from './services/notification-event-handler';
 
class EventDrivenServer {
  private eventStore: FoxEventStore;
  private eventBus: DistributedEventBus;
  private commandBus: CommandBus;
  private queryBus: QueryBus;
  private userProjectionHandler: UserProjectionHandler;
  private notificationHandler: NotificationEventHandler;
 
  constructor() {
    // Inicializar sistema de eventos
    this.eventStore = new FoxEventStore(process.env.EVENT_STORE_PROVIDER as any || 'memory');
    this.eventBus = new DistributedEventBus(process.env.EVENT_BUS_PROVIDER as any || 'memory');
    
    // Inicializar CQRS
    this.commandBus = new CommandBus();
    this.queryBus = new QueryBus();
    
    // Inicializar projections y handlers
    this.userProjectionHandler = new UserProjectionHandler(this.eventBus);
    this.notificationHandler = new NotificationEventHandler(this.eventBus);
    
    this.setupCommandHandlers();
  }
 
  private setupCommandHandlers(): void {
    // Registrar command handlers
    this.commandBus.register('CreateUser', new CreateUserCommandHandler(this.eventStore, this.eventBus));
    this.commandBus.register('VerifyEmail', new VerifyEmailCommandHandler(this.eventStore, this.eventBus));
  }
 
  // POST /users
  createUser = async (req: Request, res: Response): Promise<void> => {
    try {
      const { email, firstName, lastName } = req.body;
 
      const command = {
        commandId: `cmd-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
        aggregateId: `user-${Date.now()}`,
        commandType: 'CreateUser' as const,
        payload: { email, firstName, lastName },
        timestamp: new Date(),
        userId: req.user?.id
      };
 
      await this.commandBus.execute(command);
 
      res.status(202).json({
        message: 'User creation initiated',
        commandId: command.commandId
      });
 
    } catch (error) {
      res.status(400).json({
        error: 'Command failed',
        message: error instanceof Error ? error.message : 'Unknown error'
      });
    }
  };
 
  // POST /users/:id/verify-email
  verifyEmail = async (req: Request, res: Response): Promise<void> => {
    try {
      const userId = req.params.id;
 
      const command = {
        commandId: `cmd-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
        aggregateId: userId,
        commandType: 'VerifyEmail' as const,
        payload: { userId },
        timestamp: new Date(),
        userId: req.user?.id
      };
 
      await this.commandBus.execute(command);
 
      res.json({
        message: 'Email verification initiated',
        commandId: command.commandId
      });
 
    } catch (error) {
      res.status(400).json({
        error: 'Command failed',
        message: error instanceof Error ? error.message : 'Unknown error'
      });
    }
  };
 
  // GET /users
  getUsers = async (req: Request, res: Response): Promise<void> => {
    const users = this.userProjectionHandler.getAllUserProjections();
    res.json({ data: users });
  };
 
  // GET /users/:id
  getUser = async (req: Request, res: Response): Promise<void> => {
    const userId = req.params.id;
    const user = this.userProjectionHandler.getUserProjection(userId);
    
    if (!user) {
      return res.status(404).json({
        error: 'User not found',
        message: `User with ID ${userId} not found`
      });
    }
 
    res.json({ data: user });
  };
 
  // GET /users/search
  searchUsers = async (req: Request, res: Response): Promise<void> => {
    const { q } = req.query;
    
    if (!q) {
      return res.status(400).json({
        error: 'Search term required',
        message: 'Query parameter "q" is required'
      });
    }
 
    const users = this.userProjectionHandler.searchUsers(q as string);
    res.json({ data: users });
  };
 
  // GET /analytics/users
  getUserAnalytics = async (req: Request, res: Response): Promise<void> => {
    const stats = this.userProjectionHandler.getUserStatistics();
    res.json({ data: stats });
  };
 
  // GET /events/metrics
  getEventMetrics = async (req: Request, res: Response): Promise<void> => {
    const metrics = await this.eventBus.getMetrics();
    res.json({ data: metrics });
  };
 
  createServer() {
    return FoxFactory.createServer({
      port: 3000,
      
      routes: [
        // Command endpoints (writes)
        { path: '/users', method: 'post', handler: this.createUser },
        { path: '/users/:id/verify-email', method: 'post', handler: this.verifyEmail },
        
        // Query endpoints (reads)
        { path: '/users', method: 'get', handler: this.getUsers },
        { path: '/users/:id', method: 'get', handler: this.getUser },
        { path: '/users/search', method: 'get', handler: this.searchUsers },
        { path: '/analytics/users', method: 'get', handler: this.getUserAnalytics },
        { path: '/events/metrics', method: 'get', handler: this.getEventMetrics },
        
        // Health check
        {
          path: '/health',
          method: 'get',
          handler: (req: Request, res: Response) => {
            res.json({
              status: 'healthy',
              timestamp: new Date().toISOString(),
              eventStore: 'connected',
              eventBus: 'connected'
            });
          }
        }
      ]
    });
  }
 
  async shutdown(): Promise<void> {
    await this.eventBus.disconnect();
    console.log('🔌 Event-driven server shutdown complete');
  }
}
 
// Inicializar y arrancar servidor
const server = new EventDrivenServer();
const app = server.createServer();
 
app.start().then(() => {
  console.log('🦊 Event-driven server running on http://localhost:3000');
  console.log('📡 Event system initialized');
  console.log('📚 Available endpoints:');
  console.log('  POST /users              - Create user (command)');
  console.log('  POST /users/:id/verify-email - Verify email (command)');
  console.log('  GET  /users              - List users (query)');
  console.log('  GET  /users/:id          - Get user (query)');
  console.log('  GET  /users/search       - Search users (query)');
  console.log('  GET  /analytics/users    - User analytics (query)');
  console.log('  GET  /events/metrics     - Event metrics (query)');
});
 
// Graceful shutdown
process.on('SIGTERM', () => server.shutdown());
process.on('SIGINT', () => server.shutdown());

Variables de Entorno

# .env
# Event Store Configuration
EVENT_STORE_PROVIDER=redis  # memory, redis, postgresql, mongodb
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_EVENTS_DB=1
 
# Event Bus Configuration  
EVENT_BUS_PROVIDER=redis  # memory, redis, rabbitmq, kafka
REDIS_EVENTS_DB=2
 
# RabbitMQ (opcional)
RABBITMQ_URL=amqp://localhost:5672
 
# Kafka (opcional)
KAFKA_BROKERS=localhost:9092
 
# Application
NODE_ENV=development
FRONTEND_URL=http://localhost:3001

Características Destacadas

  • Event Sourcing: Almacenamiento completo del historial de eventos
  • CQRS: Separación de comandos y consultas para escalabilidad
  • Event Bus Distribuido: Mensajería asíncrona entre servicios
  • Projections: Vistas materializadas para consultas optimizadas
  • Multi-Provider Support: Memory, Redis, RabbitMQ, Kafka
  • Snapshots: Optimización de carga de agregados
  • Circuit Breakers: Resistencia a fallos en evento distribuidos
  • Correlation IDs: Trazabilidad de eventos relacionados
  • Dead Letter Queues: Manejo de eventos fallidos
  • Event Replay: Capacidad de reproducir eventos históricos