Event-Driven Architecture Patterns: From Event Sourcing to Sagas

Comprehensive guide to event-driven architecture patterns including Event Sourcing, CQRS, Saga patterns, and their implementation with Kafka, EventStoreDB, and cloud-native technologies.

GT
Gonnect Team
January 10, 202518 min readView on GitHub
Apache KafkaEventStoreDBCQRSSaga PatternDomain EventsAxon Framework

Introduction to Event-Driven Architecture

Event-Driven Architecture (EDA) represents a paradigm shift in how we design distributed systems. Instead of synchronous request-response patterns, EDA centers around the production, detection, and reaction to events. This approach enables loose coupling, scalability, and resilience in complex distributed systems.

Key Insight: In EDA, components communicate through events rather than direct calls, enabling systems that are more resilient, scalable, and adaptable to change.

Why Event-Driven?

Traditional ArchitectureEvent-Driven Architecture
Synchronous couplingAsynchronous, loosely coupled
Point-to-point communicationPublish-subscribe model
Tight service dependenciesServices evolve independently
Difficult to scaleNatural horizontal scaling
Single point of failureResilient to failures

Core Event-Driven Patterns

Event Sourcing

Event Sourcing stores the state of an entity as a sequence of events rather than the current state. Every change to state is captured as an event, providing a complete audit trail and enabling temporal queries.

Event Sourcing Architecture

Loading diagram...

Event Sourcing Benefits

BenefitDescription
Complete Audit TrailEvery change is recorded as an immutable event
Temporal QueriesQuery state at any point in time
Event ReplayRebuild state by replaying events
DebuggingUnderstand exactly what happened and when
AnalyticsRich historical data for analysis

Event Store Implementation

interface DomainEvent {
  eventId: string;
  aggregateId: string;
  aggregateType: string;
  eventType: string;
  eventData: Record<string, unknown>;
  metadata: {
    timestamp: Date;
    version: number;
    correlationId: string;
    causationId: string;
    userId?: string;
  };
}

class EventStore {
  private events: Map<string, DomainEvent[]> = new Map();

  async appendEvents(
    aggregateId: string,
    events: DomainEvent[],
    expectedVersion: number
  ): Promise<void> {
    const existingEvents = this.events.get(aggregateId) || [];
    const currentVersion = existingEvents.length;

    // Optimistic concurrency check
    if (currentVersion !== expectedVersion) {
      throw new ConcurrencyError(
        `Expected version ${expectedVersion}, but found ${currentVersion}`
      );
    }

    // Append new events with incremented versions
    const newEvents = events.map((event, index) => ({
      ...event,
      metadata: {
        ...event.metadata,
        version: currentVersion + index + 1,
      },
    }));

    this.events.set(aggregateId, [...existingEvents, ...newEvents]);

    // Publish events to subscribers
    await this.publishEvents(newEvents);
  }

  async getEvents(
    aggregateId: string,
    fromVersion?: number
  ): Promise<DomainEvent[]> {
    const events = this.events.get(aggregateId) || [];
    if (fromVersion !== undefined) {
      return events.filter((e) => e.metadata.version > fromVersion);
    }
    return events;
  }
}

Aggregate Implementation with Event Sourcing

abstract class EventSourcedAggregate {
  protected uncommittedEvents: DomainEvent[] = [];
  protected version: number = 0;

  abstract apply(event: DomainEvent): void;

  protected raise(event: DomainEvent): void {
    this.apply(event);
    this.uncommittedEvents.push(event);
  }

  loadFromHistory(events: DomainEvent[]): void {
    events.forEach((event) => {
      this.apply(event);
      this.version = event.metadata.version;
    });
  }

  getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents];
  }

  clearUncommittedEvents(): void {
    this.uncommittedEvents = [];
  }
}

class Order extends EventSourcedAggregate {
  private id: string = '';
  private status: OrderStatus = OrderStatus.Draft;
  private items: OrderItem[] = [];
  private total: number = 0;

  static create(orderId: string, customerId: string): Order {
    const order = new Order();
    order.raise({
      eventId: uuid(),
      aggregateId: orderId,
      aggregateType: 'Order',
      eventType: 'OrderCreated',
      eventData: { orderId, customerId },
      metadata: {
        timestamp: new Date(),
        version: 0,
        correlationId: uuid(),
        causationId: uuid(),
      },
    });
    return order;
  }

  addItem(productId: string, quantity: number, price: number): void {
    if (this.status !== OrderStatus.Draft) {
      throw new Error('Cannot add items to non-draft order');
    }
    this.raise({
      eventId: uuid(),
      aggregateId: this.id,
      aggregateType: 'Order',
      eventType: 'ItemAdded',
      eventData: { productId, quantity, price },
      metadata: {
        timestamp: new Date(),
        version: this.version,
        correlationId: uuid(),
        causationId: uuid(),
      },
    });
  }

  apply(event: DomainEvent): void {
    switch (event.eventType) {
      case 'OrderCreated':
        this.id = event.eventData.orderId as string;
        this.status = OrderStatus.Draft;
        break;
      case 'ItemAdded':
        const item = event.eventData as OrderItem;
        this.items.push(item);
        this.total += item.quantity * item.price;
        break;
      case 'OrderSubmitted':
        this.status = OrderStatus.Submitted;
        break;
    }
  }
}

CQRS Pattern

Command Query Responsibility Segregation (CQRS) separates read and write operations into different models, optimized for their specific use cases.

CQRS + Event Sourcing

Loading diagram...

CQRS Architecture Benefits

AspectWrite SideRead Side
ModelDomain-rich aggregatesDenormalized views
OptimizationTransactional consistencyQuery performance
ScalingScale for writesScale for reads independently
StorageEvent store / relationalDocument store / cache

Command Handler Implementation

interface Command {
  commandId: string;
  aggregateId: string;
  timestamp: Date;
}

interface CreateOrderCommand extends Command {
  customerId: string;
  items: Array<{
    productId: string;
    quantity: number;
    price: number;
  }>;
}

class OrderCommandHandler {
  constructor(
    private eventStore: EventStore,
    private eventPublisher: EventPublisher
  ) {}

  async handle(command: CreateOrderCommand): Promise<void> {
    // Load aggregate from event store
    const events = await this.eventStore.getEvents(command.aggregateId);
    const order = new Order();

    if (events.length > 0) {
      order.loadFromHistory(events);
    }

    // Execute command
    const newOrder = Order.create(command.aggregateId, command.customerId);
    command.items.forEach((item) =>
      newOrder.addItem(item.productId, item.quantity, item.price)
    );

    // Persist new events
    const uncommittedEvents = newOrder.getUncommittedEvents();
    await this.eventStore.appendEvents(
      command.aggregateId,
      uncommittedEvents,
      order.version
    );

    // Publish events for read model updates
    await this.eventPublisher.publishAll(uncommittedEvents);
  }
}

Read Model Projections

interface OrderReadModel {
  orderId: string;
  customerId: string;
  customerName: string;
  status: string;
  items: Array<{
    productId: string;
    productName: string;
    quantity: number;
    price: number;
    total: number;
  }>;
  subtotal: number;
  tax: number;
  total: number;
  createdAt: Date;
  updatedAt: Date;
}

class OrderProjection {
  constructor(
    private readModelStore: ReadModelStore,
    private customerService: CustomerService,
    private productService: ProductService
  ) {}

  async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
    const customer = await this.customerService.getCustomer(event.customerId);

    const readModel: OrderReadModel = {
      orderId: event.aggregateId,
      customerId: event.customerId,
      customerName: customer.name,
      status: 'Draft',
      items: [],
      subtotal: 0,
      tax: 0,
      total: 0,
      createdAt: event.metadata.timestamp,
      updatedAt: event.metadata.timestamp,
    };

    await this.readModelStore.upsert('orders', event.aggregateId, readModel);
  }

  async handleItemAdded(event: ItemAddedEvent): Promise<void> {
    const order = await this.readModelStore.get<OrderReadModel>(
      'orders',
      event.aggregateId
    );

    const product = await this.productService.getProduct(event.productId);

    const itemTotal = event.quantity * event.price;
    order.items.push({
      productId: event.productId,
      productName: product.name,
      quantity: event.quantity,
      price: event.price,
      total: itemTotal,
    });

    order.subtotal += itemTotal;
    order.tax = order.subtotal * 0.1;
    order.total = order.subtotal + order.tax;
    order.updatedAt = event.metadata.timestamp;

    await this.readModelStore.upsert('orders', event.aggregateId, order);
  }
}

Saga Patterns

Sagas coordinate transactions across multiple services in a distributed system. There are two main approaches: Orchestration and Choreography.

Saga Orchestration

In the orchestration pattern, a central coordinator (orchestrator) manages the saga flow, telling each participant what to do.

Saga Orchestration Pattern

Loading diagram...

Saga Choreography

In the choreography pattern, each service publishes events and reacts to events from other services without a central coordinator.

Saga Choreography Pattern

Loading diagram...

Pattern Comparison

AspectOrchestrationChoreography
CoordinationCentral orchestratorDistributed events
CouplingServices coupled to orchestratorLoose coupling
ComplexitySimpler flow understandingComplex event chains
Single Point of FailureOrchestrator is criticalNo single point
MonitoringCentralized visibilityDistributed tracing needed

Orchestrator Implementation

interface SagaStep {
  name: string;
  execute: (context: SagaContext) => Promise<void>;
  compensate: (context: SagaContext) => Promise<void>;
}

class OrderSagaOrchestrator {
  private steps: SagaStep[] = [
    {
      name: 'ReserveInventory',
      execute: async (ctx) => {
        const result = await this.inventoryService.reserve(
          ctx.orderId,
          ctx.items
        );
        ctx.inventoryReservationId = result.reservationId;
      },
      compensate: async (ctx) => {
        if (ctx.inventoryReservationId) {
          await this.inventoryService.cancelReservation(
            ctx.inventoryReservationId
          );
        }
      },
    },
    {
      name: 'ProcessPayment',
      execute: async (ctx) => {
        const result = await this.paymentService.charge(
          ctx.customerId,
          ctx.total
        );
        ctx.paymentId = result.paymentId;
      },
      compensate: async (ctx) => {
        if (ctx.paymentId) {
          await this.paymentService.refund(ctx.paymentId);
        }
      },
    },
    {
      name: 'CreateShipment',
      execute: async (ctx) => {
        const result = await this.shippingService.createShipment(
          ctx.orderId,
          ctx.shippingAddress
        );
        ctx.shipmentId = result.shipmentId;
      },
      compensate: async (ctx) => {
        if (ctx.shipmentId) {
          await this.shippingService.cancelShipment(ctx.shipmentId);
        }
      },
    },
  ];

  async execute(context: SagaContext): Promise<SagaResult> {
    const completedSteps: SagaStep[] = [];

    try {
      for (const step of this.steps) {
        await step.execute(context);
        completedSteps.push(step);
      }
      return { success: true, context };
    } catch (error) {
      // Compensate in reverse order
      for (const step of completedSteps.reverse()) {
        try {
          await step.compensate(context);
        } catch (compensateError) {
          // Log compensation failure
          console.error(`Compensation failed for ${step.name}`, compensateError);
        }
      }
      return { success: false, error, context };
    }
  }
}

Event-Driven Messaging with Kafka

Apache Kafka provides the backbone for event-driven systems with durable, scalable message streaming.

Kafka Producer Configuration

import { Kafka, Producer, CompressionTypes } from 'kafkajs';

class EventPublisher {
  private producer: Producer;

  constructor(private kafka: Kafka) {
    this.producer = kafka.producer({
      idempotent: true,
      maxInFlightRequests: 5,
      transactionalId: 'order-service-producer',
    });
  }

  async connect(): Promise<void> {
    await this.producer.connect();
  }

  async publish(event: DomainEvent): Promise<void> {
    const topic = `${event.aggregateType.toLowerCase()}-events`;

    await this.producer.send({
      topic,
      messages: [
        {
          key: event.aggregateId,
          value: JSON.stringify(event),
          headers: {
            eventType: event.eventType,
            correlationId: event.metadata.correlationId,
            timestamp: event.metadata.timestamp.toISOString(),
          },
        },
      ],
      compression: CompressionTypes.GZIP,
    });
  }

  async publishTransactional(events: DomainEvent[]): Promise<void> {
    const transaction = await this.producer.transaction();

    try {
      for (const event of events) {
        const topic = `${event.aggregateType.toLowerCase()}-events`;
        await transaction.send({
          topic,
          messages: [
            {
              key: event.aggregateId,
              value: JSON.stringify(event),
            },
          ],
        });
      }
      await transaction.commit();
    } catch (error) {
      await transaction.abort();
      throw error;
    }
  }
}

Kafka Consumer with Exactly-Once Processing

class EventConsumer {
  private consumer: Consumer;

  constructor(
    private kafka: Kafka,
    private groupId: string
  ) {
    this.consumer = kafka.consumer({
      groupId,
      sessionTimeout: 30000,
      heartbeatInterval: 3000,
      maxBytesPerPartition: 1048576,
    });
  }

  async subscribe(topics: string[]): Promise<void> {
    await this.consumer.connect();

    for (const topic of topics) {
      await this.consumer.subscribe({
        topic,
        fromBeginning: false,
      });
    }
  }

  async consume(handler: EventHandler): Promise<void> {
    await this.consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        const event: DomainEvent = JSON.parse(message.value!.toString());

        try {
          await handler.handle(event);
        } catch (error) {
          // Handle processing error
          await this.handleError(event, error);
        }
      },
    });
  }

  private async handleError(event: DomainEvent, error: Error): Promise<void> {
    // Send to dead letter queue
    await this.producer.send({
      topic: 'dead-letter-queue',
      messages: [
        {
          key: event.aggregateId,
          value: JSON.stringify({
            originalEvent: event,
            error: error.message,
            timestamp: new Date().toISOString(),
          }),
        },
      ],
    });
  }
}

Outbox Pattern

The Outbox pattern ensures reliable event publishing by storing events in the same transaction as the aggregate update.

Outbox Implementation

class OutboxRepository {
  constructor(private db: Database) {}

  async saveWithOutbox(
    aggregate: EventSourcedAggregate,
    transaction: Transaction
  ): Promise<void> {
    const events = aggregate.getUncommittedEvents();

    // Save aggregate state
    await this.saveAggregate(aggregate, transaction);

    // Save events to outbox table
    for (const event of events) {
      await transaction.execute(
        `INSERT INTO outbox (event_id, aggregate_id, event_type, payload, created_at)
         VALUES ($1, $2, $3, $4, $5)`,
        [
          event.eventId,
          event.aggregateId,
          event.eventType,
          JSON.stringify(event),
          new Date(),
        ]
      );
    }
  }
}

class OutboxProcessor {
  constructor(
    private db: Database,
    private eventPublisher: EventPublisher
  ) {}

  async processOutbox(): Promise<void> {
    const events = await this.db.query(
      `SELECT * FROM outbox WHERE published_at IS NULL
       ORDER BY created_at ASC LIMIT 100`
    );

    for (const row of events) {
      try {
        const event = JSON.parse(row.payload);
        await this.eventPublisher.publish(event);

        await this.db.execute(
          `UPDATE outbox SET published_at = $1 WHERE event_id = $2`,
          [new Date(), row.event_id]
        );
      } catch (error) {
        await this.db.execute(
          `UPDATE outbox SET retry_count = retry_count + 1,
           last_error = $1 WHERE event_id = $2`,
          [error.message, row.event_id]
        );
      }
    }
  }
}

Best Practices

1. Event Design

  • Immutability: Events should never be modified once published
  • Self-contained: Include all necessary information in the event
  • Versioning: Plan for schema evolution from the start
  • Naming: Use past-tense verbs (OrderCreated, PaymentProcessed)

2. Idempotency

  • Deduplication: Track processed event IDs to prevent duplicate processing
  • Idempotent Handlers: Design handlers that produce the same result regardless of how many times they're called

3. Error Handling

  • Dead Letter Queues: Route failed events for manual inspection
  • Retry Policies: Implement exponential backoff for transient failures
  • Circuit Breakers: Prevent cascade failures in distributed systems

4. Monitoring

  • Event Tracing: Use correlation IDs to trace events across services
  • Metrics: Track event throughput, latency, and error rates
  • Alerting: Set up alerts for lag, failures, and SLA breaches

Conclusion

Event-Driven Architecture provides a powerful foundation for building scalable, resilient distributed systems. The key patterns covered include:

  1. Event Sourcing: Store state as a sequence of events for complete audit trails
  2. CQRS: Separate read and write models for optimized performance
  3. Saga Orchestration: Centralized coordination of distributed transactions
  4. Saga Choreography: Decentralized event-driven coordination
  5. Outbox Pattern: Reliable event publishing with transactional guarantees

Choose the right patterns based on your consistency requirements, team expertise, and operational capabilities. Start simple and evolve your architecture as needs grow.

Further Reading

For complete implementation examples, visit the EventDrivenPlatform GitHub repository.