Event-Driven Architecture Patterns: From Event Sourcing to Sagas
Comprehensive guide to event-driven architecture patterns including Event Sourcing, CQRS, Saga patterns, and their implementation with Kafka, EventStoreDB, and cloud-native technologies.
Table of Contents
Introduction to Event-Driven Architecture
Event-Driven Architecture (EDA) represents a paradigm shift in how we design distributed systems. Instead of synchronous request-response patterns, EDA centers around the production, detection, and reaction to events. This approach enables loose coupling, scalability, and resilience in complex distributed systems.
Key Insight: In EDA, components communicate through events rather than direct calls, enabling systems that are more resilient, scalable, and adaptable to change.
Why Event-Driven?
| Traditional Architecture | Event-Driven Architecture |
|---|---|
| Synchronous coupling | Asynchronous, loosely coupled |
| Point-to-point communication | Publish-subscribe model |
| Tight service dependencies | Services evolve independently |
| Difficult to scale | Natural horizontal scaling |
| Single point of failure | Resilient to failures |
Core Event-Driven Patterns
Event Sourcing
Event Sourcing stores the state of an entity as a sequence of events rather than the current state. Every change to state is captured as an event, providing a complete audit trail and enabling temporal queries.
Event Sourcing Architecture
Event Sourcing Benefits
| Benefit | Description |
|---|---|
| Complete Audit Trail | Every change is recorded as an immutable event |
| Temporal Queries | Query state at any point in time |
| Event Replay | Rebuild state by replaying events |
| Debugging | Understand exactly what happened and when |
| Analytics | Rich historical data for analysis |
Event Store Implementation
interface DomainEvent {
eventId: string;
aggregateId: string;
aggregateType: string;
eventType: string;
eventData: Record<string, unknown>;
metadata: {
timestamp: Date;
version: number;
correlationId: string;
causationId: string;
userId?: string;
};
}
class EventStore {
private events: Map<string, DomainEvent[]> = new Map();
async appendEvents(
aggregateId: string,
events: DomainEvent[],
expectedVersion: number
): Promise<void> {
const existingEvents = this.events.get(aggregateId) || [];
const currentVersion = existingEvents.length;
// Optimistic concurrency check
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but found ${currentVersion}`
);
}
// Append new events with incremented versions
const newEvents = events.map((event, index) => ({
...event,
metadata: {
...event.metadata,
version: currentVersion + index + 1,
},
}));
this.events.set(aggregateId, [...existingEvents, ...newEvents]);
// Publish events to subscribers
await this.publishEvents(newEvents);
}
async getEvents(
aggregateId: string,
fromVersion?: number
): Promise<DomainEvent[]> {
const events = this.events.get(aggregateId) || [];
if (fromVersion !== undefined) {
return events.filter((e) => e.metadata.version > fromVersion);
}
return events;
}
}
Aggregate Implementation with Event Sourcing
abstract class EventSourcedAggregate {
protected uncommittedEvents: DomainEvent[] = [];
protected version: number = 0;
abstract apply(event: DomainEvent): void;
protected raise(event: DomainEvent): void {
this.apply(event);
this.uncommittedEvents.push(event);
}
loadFromHistory(events: DomainEvent[]): void {
events.forEach((event) => {
this.apply(event);
this.version = event.metadata.version;
});
}
getUncommittedEvents(): DomainEvent[] {
return [...this.uncommittedEvents];
}
clearUncommittedEvents(): void {
this.uncommittedEvents = [];
}
}
class Order extends EventSourcedAggregate {
private id: string = '';
private status: OrderStatus = OrderStatus.Draft;
private items: OrderItem[] = [];
private total: number = 0;
static create(orderId: string, customerId: string): Order {
const order = new Order();
order.raise({
eventId: uuid(),
aggregateId: orderId,
aggregateType: 'Order',
eventType: 'OrderCreated',
eventData: { orderId, customerId },
metadata: {
timestamp: new Date(),
version: 0,
correlationId: uuid(),
causationId: uuid(),
},
});
return order;
}
addItem(productId: string, quantity: number, price: number): void {
if (this.status !== OrderStatus.Draft) {
throw new Error('Cannot add items to non-draft order');
}
this.raise({
eventId: uuid(),
aggregateId: this.id,
aggregateType: 'Order',
eventType: 'ItemAdded',
eventData: { productId, quantity, price },
metadata: {
timestamp: new Date(),
version: this.version,
correlationId: uuid(),
causationId: uuid(),
},
});
}
apply(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderCreated':
this.id = event.eventData.orderId as string;
this.status = OrderStatus.Draft;
break;
case 'ItemAdded':
const item = event.eventData as OrderItem;
this.items.push(item);
this.total += item.quantity * item.price;
break;
case 'OrderSubmitted':
this.status = OrderStatus.Submitted;
break;
}
}
}
CQRS Pattern
Command Query Responsibility Segregation (CQRS) separates read and write operations into different models, optimized for their specific use cases.
CQRS + Event Sourcing
CQRS Architecture Benefits
| Aspect | Write Side | Read Side |
|---|---|---|
| Model | Domain-rich aggregates | Denormalized views |
| Optimization | Transactional consistency | Query performance |
| Scaling | Scale for writes | Scale for reads independently |
| Storage | Event store / relational | Document store / cache |
Command Handler Implementation
interface Command {
commandId: string;
aggregateId: string;
timestamp: Date;
}
interface CreateOrderCommand extends Command {
customerId: string;
items: Array<{
productId: string;
quantity: number;
price: number;
}>;
}
class OrderCommandHandler {
constructor(
private eventStore: EventStore,
private eventPublisher: EventPublisher
) {}
async handle(command: CreateOrderCommand): Promise<void> {
// Load aggregate from event store
const events = await this.eventStore.getEvents(command.aggregateId);
const order = new Order();
if (events.length > 0) {
order.loadFromHistory(events);
}
// Execute command
const newOrder = Order.create(command.aggregateId, command.customerId);
command.items.forEach((item) =>
newOrder.addItem(item.productId, item.quantity, item.price)
);
// Persist new events
const uncommittedEvents = newOrder.getUncommittedEvents();
await this.eventStore.appendEvents(
command.aggregateId,
uncommittedEvents,
order.version
);
// Publish events for read model updates
await this.eventPublisher.publishAll(uncommittedEvents);
}
}
Read Model Projections
interface OrderReadModel {
orderId: string;
customerId: string;
customerName: string;
status: string;
items: Array<{
productId: string;
productName: string;
quantity: number;
price: number;
total: number;
}>;
subtotal: number;
tax: number;
total: number;
createdAt: Date;
updatedAt: Date;
}
class OrderProjection {
constructor(
private readModelStore: ReadModelStore,
private customerService: CustomerService,
private productService: ProductService
) {}
async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
const customer = await this.customerService.getCustomer(event.customerId);
const readModel: OrderReadModel = {
orderId: event.aggregateId,
customerId: event.customerId,
customerName: customer.name,
status: 'Draft',
items: [],
subtotal: 0,
tax: 0,
total: 0,
createdAt: event.metadata.timestamp,
updatedAt: event.metadata.timestamp,
};
await this.readModelStore.upsert('orders', event.aggregateId, readModel);
}
async handleItemAdded(event: ItemAddedEvent): Promise<void> {
const order = await this.readModelStore.get<OrderReadModel>(
'orders',
event.aggregateId
);
const product = await this.productService.getProduct(event.productId);
const itemTotal = event.quantity * event.price;
order.items.push({
productId: event.productId,
productName: product.name,
quantity: event.quantity,
price: event.price,
total: itemTotal,
});
order.subtotal += itemTotal;
order.tax = order.subtotal * 0.1;
order.total = order.subtotal + order.tax;
order.updatedAt = event.metadata.timestamp;
await this.readModelStore.upsert('orders', event.aggregateId, order);
}
}
Saga Patterns
Sagas coordinate transactions across multiple services in a distributed system. There are two main approaches: Orchestration and Choreography.
Saga Orchestration
In the orchestration pattern, a central coordinator (orchestrator) manages the saga flow, telling each participant what to do.
Saga Orchestration Pattern
Saga Choreography
In the choreography pattern, each service publishes events and reacts to events from other services without a central coordinator.
Saga Choreography Pattern
Pattern Comparison
| Aspect | Orchestration | Choreography |
|---|---|---|
| Coordination | Central orchestrator | Distributed events |
| Coupling | Services coupled to orchestrator | Loose coupling |
| Complexity | Simpler flow understanding | Complex event chains |
| Single Point of Failure | Orchestrator is critical | No single point |
| Monitoring | Centralized visibility | Distributed tracing needed |
Orchestrator Implementation
interface SagaStep {
name: string;
execute: (context: SagaContext) => Promise<void>;
compensate: (context: SagaContext) => Promise<void>;
}
class OrderSagaOrchestrator {
private steps: SagaStep[] = [
{
name: 'ReserveInventory',
execute: async (ctx) => {
const result = await this.inventoryService.reserve(
ctx.orderId,
ctx.items
);
ctx.inventoryReservationId = result.reservationId;
},
compensate: async (ctx) => {
if (ctx.inventoryReservationId) {
await this.inventoryService.cancelReservation(
ctx.inventoryReservationId
);
}
},
},
{
name: 'ProcessPayment',
execute: async (ctx) => {
const result = await this.paymentService.charge(
ctx.customerId,
ctx.total
);
ctx.paymentId = result.paymentId;
},
compensate: async (ctx) => {
if (ctx.paymentId) {
await this.paymentService.refund(ctx.paymentId);
}
},
},
{
name: 'CreateShipment',
execute: async (ctx) => {
const result = await this.shippingService.createShipment(
ctx.orderId,
ctx.shippingAddress
);
ctx.shipmentId = result.shipmentId;
},
compensate: async (ctx) => {
if (ctx.shipmentId) {
await this.shippingService.cancelShipment(ctx.shipmentId);
}
},
},
];
async execute(context: SagaContext): Promise<SagaResult> {
const completedSteps: SagaStep[] = [];
try {
for (const step of this.steps) {
await step.execute(context);
completedSteps.push(step);
}
return { success: true, context };
} catch (error) {
// Compensate in reverse order
for (const step of completedSteps.reverse()) {
try {
await step.compensate(context);
} catch (compensateError) {
// Log compensation failure
console.error(`Compensation failed for ${step.name}`, compensateError);
}
}
return { success: false, error, context };
}
}
}
Event-Driven Messaging with Kafka
Apache Kafka provides the backbone for event-driven systems with durable, scalable message streaming.
Kafka Producer Configuration
import { Kafka, Producer, CompressionTypes } from 'kafkajs';
class EventPublisher {
private producer: Producer;
constructor(private kafka: Kafka) {
this.producer = kafka.producer({
idempotent: true,
maxInFlightRequests: 5,
transactionalId: 'order-service-producer',
});
}
async connect(): Promise<void> {
await this.producer.connect();
}
async publish(event: DomainEvent): Promise<void> {
const topic = `${event.aggregateType.toLowerCase()}-events`;
await this.producer.send({
topic,
messages: [
{
key: event.aggregateId,
value: JSON.stringify(event),
headers: {
eventType: event.eventType,
correlationId: event.metadata.correlationId,
timestamp: event.metadata.timestamp.toISOString(),
},
},
],
compression: CompressionTypes.GZIP,
});
}
async publishTransactional(events: DomainEvent[]): Promise<void> {
const transaction = await this.producer.transaction();
try {
for (const event of events) {
const topic = `${event.aggregateType.toLowerCase()}-events`;
await transaction.send({
topic,
messages: [
{
key: event.aggregateId,
value: JSON.stringify(event),
},
],
});
}
await transaction.commit();
} catch (error) {
await transaction.abort();
throw error;
}
}
}
Kafka Consumer with Exactly-Once Processing
class EventConsumer {
private consumer: Consumer;
constructor(
private kafka: Kafka,
private groupId: string
) {
this.consumer = kafka.consumer({
groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
maxBytesPerPartition: 1048576,
});
}
async subscribe(topics: string[]): Promise<void> {
await this.consumer.connect();
for (const topic of topics) {
await this.consumer.subscribe({
topic,
fromBeginning: false,
});
}
}
async consume(handler: EventHandler): Promise<void> {
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event: DomainEvent = JSON.parse(message.value!.toString());
try {
await handler.handle(event);
} catch (error) {
// Handle processing error
await this.handleError(event, error);
}
},
});
}
private async handleError(event: DomainEvent, error: Error): Promise<void> {
// Send to dead letter queue
await this.producer.send({
topic: 'dead-letter-queue',
messages: [
{
key: event.aggregateId,
value: JSON.stringify({
originalEvent: event,
error: error.message,
timestamp: new Date().toISOString(),
}),
},
],
});
}
}
Outbox Pattern
The Outbox pattern ensures reliable event publishing by storing events in the same transaction as the aggregate update.
Outbox Implementation
class OutboxRepository {
constructor(private db: Database) {}
async saveWithOutbox(
aggregate: EventSourcedAggregate,
transaction: Transaction
): Promise<void> {
const events = aggregate.getUncommittedEvents();
// Save aggregate state
await this.saveAggregate(aggregate, transaction);
// Save events to outbox table
for (const event of events) {
await transaction.execute(
`INSERT INTO outbox (event_id, aggregate_id, event_type, payload, created_at)
VALUES ($1, $2, $3, $4, $5)`,
[
event.eventId,
event.aggregateId,
event.eventType,
JSON.stringify(event),
new Date(),
]
);
}
}
}
class OutboxProcessor {
constructor(
private db: Database,
private eventPublisher: EventPublisher
) {}
async processOutbox(): Promise<void> {
const events = await this.db.query(
`SELECT * FROM outbox WHERE published_at IS NULL
ORDER BY created_at ASC LIMIT 100`
);
for (const row of events) {
try {
const event = JSON.parse(row.payload);
await this.eventPublisher.publish(event);
await this.db.execute(
`UPDATE outbox SET published_at = $1 WHERE event_id = $2`,
[new Date(), row.event_id]
);
} catch (error) {
await this.db.execute(
`UPDATE outbox SET retry_count = retry_count + 1,
last_error = $1 WHERE event_id = $2`,
[error.message, row.event_id]
);
}
}
}
}
Best Practices
1. Event Design
- Immutability: Events should never be modified once published
- Self-contained: Include all necessary information in the event
- Versioning: Plan for schema evolution from the start
- Naming: Use past-tense verbs (OrderCreated, PaymentProcessed)
2. Idempotency
- Deduplication: Track processed event IDs to prevent duplicate processing
- Idempotent Handlers: Design handlers that produce the same result regardless of how many times they're called
3. Error Handling
- Dead Letter Queues: Route failed events for manual inspection
- Retry Policies: Implement exponential backoff for transient failures
- Circuit Breakers: Prevent cascade failures in distributed systems
4. Monitoring
- Event Tracing: Use correlation IDs to trace events across services
- Metrics: Track event throughput, latency, and error rates
- Alerting: Set up alerts for lag, failures, and SLA breaches
Conclusion
Event-Driven Architecture provides a powerful foundation for building scalable, resilient distributed systems. The key patterns covered include:
- Event Sourcing: Store state as a sequence of events for complete audit trails
- CQRS: Separate read and write models for optimized performance
- Saga Orchestration: Centralized coordination of distributed transactions
- Saga Choreography: Decentralized event-driven coordination
- Outbox Pattern: Reliable event publishing with transactional guarantees
Choose the right patterns based on your consistency requirements, team expertise, and operational capabilities. Start simple and evolve your architecture as needs grow.
Further Reading
- Enterprise Integration Patterns
- Event Sourcing by Martin Fowler
- CQRS Journey by Microsoft
- Apache Kafka Documentation
- EventStoreDB Documentation
For complete implementation examples, visit the EventDrivenPlatform GitHub repository.