Kafka Streams Aggregation Patterns with Spring Cloud Stream: KTable, KStream, and Windowing Deep Dive

Master Kafka Streams aggregation patterns using Spring Cloud Stream. Learn how to implement stateful aggregations, KStream-KTable joins, windowed computations, and real-time analytics pipelines with practical Java examples.

GT
Gonnect Team
January 14, 202414 min readView on GitHub
JavaKafkaSpring Cloud StreamKafka Streams

Introduction

Real-time stream processing has become essential for modern data architectures. Organizations need to aggregate, analyze, and act on streaming data within milliseconds. Apache Kafka Streams, combined with Spring Cloud Stream, provides a powerful framework for building stateful stream processing applications that can handle millions of events per second.

This article focuses on the aggregation patterns available in Kafka Streams, exploring how to transform unbounded event streams into meaningful business insights through stateful computations.

Key Insight: Aggregation is the process of combining multiple records into a single result. In streaming contexts, this requires careful consideration of time semantics, state management, and exactly-once processing guarantees.

Understanding KStream and KTable: The Duality Principle

Before diving into aggregation patterns, it's essential to understand the fundamental abstractions in Kafka Streams:

KStream: The Event Stream

A KStream represents an unbounded, continuously updating stream of records. Each record is an independent, immutable fact:

// KStream: Each record is independent
// Key: userId, Value: clickEvent
// ("alice", click1) -> ("bob", click2) -> ("alice", click3)
// All three records are preserved in the stream

Characteristics:

  • Append-only semantics
  • Each record represents an event at a point in time
  • No concept of "current state" per key
  • Ideal for events, transactions, and logs

KTable: The Changelog Stream

A KTable represents a changelog stream where each record is an update to the current value for a key:

// KTable: Each record updates the value for its key
// Key: userId, Value: userProfile
// ("alice", profile1) -> ("alice", profile2)
// Only profile2 is retained as the current state for "alice"

Characteristics:

  • Upsert semantics (insert or update)
  • Maintains current state per key
  • Backed by a state store
  • Ideal for reference data, aggregation results, and materialized views

The Stream-Table Duality

Kafka Streams embraces the stream-table duality principle:

TransformationDescription
Stream to TableAggregate a stream to get current state per key
Table to StreamConvert changelog updates into an event stream
// Stream to Table: Aggregation
KTable<String, Long> userClickCounts = userClicks
    .groupByKey()
    .count();

// Table to Stream: Convert back
KStream<String, Long> clickCountUpdates = userClickCounts.toStream();

Aggregation Fundamentals

Aggregation in Kafka Streams always follows a specific pattern:

  1. Group by key - Organize records by a grouping key
  2. Apply aggregation - Compute the aggregated value
  3. Materialize - Store results in a state store

The Three Aggregation Operations

1. Count

The simplest aggregation - counts records per key:

@EnableBinding(KafkaStreamsProcessor.class)
public class EventCounter {

    @StreamListener("input")
    @SendTo("output")
    public KStream<String, Long> countEvents(KStream<String, Event> events) {
        return events
            .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(Event.class)))
            .count(Materialized.as("event-counts-store"))
            .toStream();
    }
}

2. Reduce

Combines values using a binary operator - requires same input and output types:

@StreamListener("input")
@SendTo("output")
public KStream<String, Long> sumClicks(KStream<String, Long> userClicks) {
    return userClicks
        .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
        .reduce(
            (clicks1, clicks2) -> clicks1 + clicks2,  // Adder
            Materialized.as("click-sums-store")
        )
        .toStream();
}

3. Aggregate

The most flexible option - supports different input and output types with custom logic:

@StreamListener("input")
@SendTo("output")
public KStream<String, UserStatistics> aggregateUserActivity(
        KStream<String, UserActivity> activities) {

    return activities
        .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(UserActivity.class)))
        .aggregate(
            // Initializer: Create new statistics object
            UserStatistics::new,

            // Aggregator: Update statistics with each activity
            (userId, activity, stats) -> {
                stats.incrementActivityCount();
                stats.addDuration(activity.getDuration());
                stats.updateLastSeen(activity.getTimestamp());
                return stats;
            },

            // Materialized: Configure state store
            Materialized.<String, UserStatistics, KeyValueStore<Bytes, byte[]>>as("user-stats-store")
                .withKeySerde(Serdes.String())
                .withValueSerde(new JsonSerde<>(UserStatistics.class))
        )
        .toStream();
}

Implementing Stream-Table Joins with Aggregation

The kafka-stream-aggregation project demonstrates a powerful pattern: joining streams with tables and then aggregating the enriched data.

Use Case: Regional Click Analytics

Consider a scenario where you need to:

  1. Enrich user click events with their region information
  2. Aggregate total clicks per region

Domain Model

// User click event from the stream
public class UserClick {
    private String userId;
    private long clicks;
    private Instant timestamp;

    // Constructors, getters, setters...
}

// Region enriched click for aggregation
public class RegionWithClicks {
    private final String region;
    private final long clicks;

    public RegionWithClicks(String region, long clicks) {
        if (region == null || region.isEmpty()) {
            throw new IllegalArgumentException("Region must be set");
        }
        if (clicks < 0) {
            throw new IllegalArgumentException("Clicks must not be negative");
        }
        this.region = region;
        this.clicks = clicks;
    }

    public String getRegion() { return region; }
    public long getClicks() { return clicks; }
}

Stream Processing Implementation

@SpringBootApplication
public class KafkaStreamAggregationApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaStreamAggregationApplication.class, args);
    }

    @EnableBinding(KStreamProcessorWithTable.class)
    public static class RegionalClickAggregator {

        @StreamListener
        @SendTo("output")
        public KStream<String, Long> process(
                @Input("input") KStream<String, Long> userClicksStream,
                @Input("inputTable") KTable<String, String> userRegionsTable) {

            return userClicksStream
                // Step 1: Enrich clicks with region via left join
                .leftJoin(
                    userRegionsTable,
                    (clicks, region) -> new RegionWithClicks(
                        region != null ? region : "UNKNOWN",
                        clicks
                    ),
                    Joined.with(Serdes.String(), Serdes.Long(), Serdes.String())
                )

                // Step 2: Re-key by region for aggregation
                .map((userId, regionWithClicks) -> new KeyValue<>(
                    regionWithClicks.getRegion(),
                    regionWithClicks.getClicks()
                ))

                // Step 3: Group by the new key (region)
                .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))

                // Step 4: Aggregate using reduce
                .reduce(
                    (totalClicks, newClicks) -> totalClicks + newClicks,
                    Materialized.as("regional-clicks-store")
                )

                // Step 5: Convert back to stream for output
                .toStream();
        }
    }

    // Custom binding interface for multiple inputs
    interface KStreamProcessorWithTable extends KafkaStreamsProcessor {
        @Input("inputTable")
        KTable<?, ?> inputKTable();
    }
}

Configuration

spring:
  application:
    name: kafka-stream-aggregation
  cloud:
    stream:
      bindings:
        input:
          destination: user-clicks
          consumer:
            useNativeDecoding: true
        inputTable:
          destination: user-regions
          consumer:
            useNativeDecoding: true
        output:
          destination: regional-click-totals
          producer:
            useNativeEncoding: true
      kafka:
        streams:
          bindings:
            input:
              consumer:
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde
            inputTable:
              consumer:
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
            output:
              producer:
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde
          binder:
            brokers: localhost:9092
            configuration:
              default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              commit.interval.ms: 1000

Windowed Aggregations

For time-bounded analytics, Kafka Streams provides windowing capabilities that partition the stream into finite time intervals.

Window Types

Window TypeDescriptionUse Case
TumblingFixed-size, non-overlappingHourly/daily totals
HoppingFixed-size, overlappingMoving averages
SessionDynamic, gap-basedUser session analytics
SlidingEvent-triggered, fixed durationRecent activity windows

Tumbling Window Aggregation

@StreamListener("input")
@SendTo("output")
public KStream<Windowed<String>, Long> hourlyClickCounts(
        KStream<String, Long> userClicks) {

    return userClicks
        .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))

        // Apply 1-hour tumbling windows
        .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))

        // Count clicks within each window
        .count(Materialized.as("hourly-clicks-store"))

        .toStream();
}

Hopping Window Aggregation

Hopping windows allow overlapping time intervals for smoother analytics:

@StreamListener("input")
@SendTo("output")
public KStream<Windowed<String>, AggregateResult> movingAverageClicks(
        KStream<String, ClickEvent> clicks) {

    return clicks
        .groupByKey()

        // 5-minute windows, advancing every 1 minute
        .windowedBy(TimeWindows
            .ofSizeWithNoGrace(Duration.ofMinutes(5))
            .advanceBy(Duration.ofMinutes(1)))

        .aggregate(
            AggregateResult::new,
            (key, click, aggregate) -> {
                aggregate.addValue(click.getValue());
                return aggregate;
            },
            Materialized.as("moving-average-store")
        )

        .toStream()
        .mapValues(AggregateResult::getAverage);
}

Session Window Aggregation

Session windows group events by periods of activity:

@StreamListener("input")
@SendTo("output")
public KStream<Windowed<String>, SessionSummary> userSessionAnalysis(
        KStream<String, UserEvent> events) {

    return events
        .groupByKey()

        // Sessions with 30-minute inactivity gap
        .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))

        .aggregate(
            SessionSummary::new,
            (key, event, session) -> {
                session.addEvent(event);
                return session;
            },
            // Session merger for adjacent sessions
            (key, session1, session2) -> session1.merge(session2),
            Materialized.as("session-store")
        )

        .toStream();
}

Running the Application

Prerequisites

  • Docker and Docker Compose
  • Java 11 or higher
  • Maven

Starting the Infrastructure

# Clone the repository
git clone https://github.com/mgorav/kafka-stream-aggregation.git
cd kafka-stream-aggregation

# Start Kafka and Zookeeper
docker-compose up -d

# Verify containers are running
docker-compose ps

Building and Running

# Build the application
./mvnw clean package

# Run the application
java -jar target/kafka-stream-aggregation-0.0.1-SNAPSHOT.jar

Producing Test Data

public class TestDataProducer {
    public static void main(String[] args) {
        // User click events (KStream input)
        List<KeyValue<String, Long>> userClicks = Arrays.asList(
            new KeyValue<>("alice", 13L),
            new KeyValue<>("bob", 4L),
            new KeyValue<>("chao", 25L),
            new KeyValue<>("bob", 19L),
            new KeyValue<>("dave", 56L),
            new KeyValue<>("eve", 78L),
            new KeyValue<>("alice", 40L),
            new KeyValue<>("fang", 99L)
        );

        // User region reference data (KTable input)
        List<KeyValue<String, String>> userRegions = Arrays.asList(
            new KeyValue<>("alice", "asia"),
            new KeyValue<>("bob", "americas"),
            new KeyValue<>("chao", "asia"),
            new KeyValue<>("dave", "europe"),
            new KeyValue<>("alice", "europe"),  // Alice moved regions
            new KeyValue<>("eve", "americas"),
            new KeyValue<>("fang", "asia")
        );

        // Produce to respective topics...
    }
}

Consuming Results

# Consume aggregated results
docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server 127.0.0.1:9092 \
  --topic regional-click-totals \
  --property print.key=true \
  --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
  --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer \
  --from-beginning

Expected output:

asia    137     # (alice:13 + chao:25 + fang:99)
americas 101   # (bob:4 + bob:19 + eve:78)
europe  96     # (dave:56 + alice:40 after region change)

State Store Management

Aggregations require state stores to maintain intermediate results. Understanding state store configuration is crucial for production deployments.

State Store Types

Store TypeBackingUse Case
In-MemoryHashMapDevelopment, small state
RocksDBDisk-basedProduction, large state
CustomUser-definedSpecial requirements

Configuring State Stores

// In-memory store for development
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("my-store")
    .withLoggingDisabled()  // Disable changelog for testing

// RocksDB store for production (default)
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("my-store")
    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.Long())
    .withCachingEnabled()   // Enable record cache
    .withLoggingEnabled(Map.of(
        "retention.ms", "604800000"  // 7 days retention
    ))

Interactive Queries

State stores can be queried directly for real-time lookups:

@RestController
public class AggregationQueryController {

    private final InteractiveQueryService queryService;

    @GetMapping("/clicks/region/{region}")
    public Long getClicksByRegion(@PathVariable String region) {
        ReadOnlyKeyValueStore<String, Long> store = queryService
            .getQueryableStore(
                "regional-clicks-store",
                QueryableStoreTypes.keyValueStore()
            );

        return store.get(region);
    }

    @GetMapping("/clicks/all")
    public Map<String, Long> getAllRegionalClicks() {
        ReadOnlyKeyValueStore<String, Long> store = queryService
            .getQueryableStore(
                "regional-clicks-store",
                QueryableStoreTypes.keyValueStore()
            );

        Map<String, Long> results = new HashMap<>();
        try (KeyValueIterator<String, Long> iterator = store.all()) {
            iterator.forEachRemaining(kv -> results.put(kv.key, kv.value));
        }
        return results;
    }
}

Best Practices for Production

Serialization

Always use explicit serdes configuration:

// Good: Explicit serde configuration
.groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(Event.class)))

// Avoid: Relying on default serdes
.groupByKey()  // May fail with ClassCastException

Error Handling

Configure deserialization error handling:

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            configuration:
              default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler

Exactly-Once Semantics

Enable exactly-once processing for critical pipelines:

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            configuration:
              processing.guarantee: exactly_once_v2

Monitoring

Expose Kafka Streams metrics:

@Bean
public MeterBinder kafkaStreamsMeterBinder(StreamsBuilderFactoryBean factory) {
    return new KafkaStreamsMicrometerListener(factory.getKafkaStreams());
}

Conclusion

Kafka Streams aggregation patterns provide powerful primitives for building real-time analytics applications. The combination of KStream and KTable abstractions, along with windowing capabilities, enables sophisticated stream processing scenarios:

  • Count, Reduce, and Aggregate operations cover the full spectrum of aggregation needs
  • Stream-Table joins enable enrichment before aggregation
  • Windowed aggregations provide time-bounded analytics
  • State stores maintain aggregation results with fault tolerance
  • Interactive queries expose real-time state for serving layers

The kafka-stream-aggregation project demonstrates these patterns with Spring Cloud Stream, providing a production-ready foundation for building your own real-time aggregation pipelines.

When designing aggregation pipelines, consider:

  • Co-partitioning requirements for joins
  • State store sizing for your data volume
  • Window semantics that match your business requirements
  • Exactly-once guarantees for critical data

References and Further Reading

GitHub Repository

Documentation