Real-Time Twitter Trend Analysis with Spring Cloud Stream and Kafka

Build a scalable event streaming pipeline for Twitter trend analysis using Spring Cloud Stream, Kafka Streams, and MapR-ES. Master KStream, KTable, and processor patterns for real-time analytics.

GT
Gonnect Team
January 14, 202412 min readView on GitHub
JavaSpring Cloud StreamApache KafkaMapR-ESKafka Streams

Introduction

Social media platforms generate massive volumes of real-time data that hold valuable insights - trending topics, sentiment shifts, and emerging conversations. Capturing and analyzing this data at scale requires a robust event streaming architecture that can handle millions of events per second.

This project demonstrates building a Twitter trend analysis pipeline using Spring Cloud Stream with Apache Kafka (and MapR-ES). By leveraging the abstractions provided by Spring Cloud Stream, we create a broker-agnostic streaming application that can run on various messaging platforms while maintaining high throughput and low latency.

Key Insight: Spring Cloud Stream + Kafka Streams = Developer Productivity + High Performance + Scalability. The combination provides the best of both worlds - Spring's ease of use with Kafka's streaming capabilities.

Architecture Overview

Event-Driven Architecture

Loading diagram...

System Components

┌─────────────────────────────────────────────────────────────────────┐
│                    TWITTER TREND ANALYSIS PIPELINE                    │
├─────────────────────────────────────────────────────────────────────┤
│                                                                       │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐           │
│  │   Twitter    │    │   Twitter    │    │   Twitter    │           │
│  │   Stream 1   │    │   Stream 2   │    │   Stream N   │           │
│  └──────┬───────┘    └──────┬───────┘    └──────┬───────┘           │
│         │                   │                   │                    │
│         └───────────────────┼───────────────────┘                    │
│                             │                                        │
│                    ┌────────▼────────┐                               │
│                    │  Kafka/MapR-ES  │                               │
│                    │  (tweets topic) │                               │
│                    └────────┬────────┘                               │
│                             │                                        │
│         ┌───────────────────┼───────────────────┐                    │
│         │                   │                   │                    │
│  ┌──────▼──────┐    ┌───────▼───────┐   ┌──────▼──────┐             │
│  │  Hashtag    │    │   Sentiment   │   │   User      │             │
│  │  Processor  │    │   Analyzer    │   │  Influence  │             │
│  └──────┬──────┘    └───────┬───────┘   └──────┬──────┘             │
│         │                   │                   │                    │
│  ┌──────▼──────┐    ┌───────▼───────┐   ┌──────▼──────┐             │
│  │  Trending   │    │   Sentiment   │   │  Influencer │             │
│  │  Topics     │    │   Dashboard   │   │  Rankings   │             │
│  └─────────────┘    └───────────────┘   └─────────────┘             │
│                                                                       │
└─────────────────────────────────────────────────────────────────────┘

Technology Stack

ComponentTechnologyPurpose
MessagingApache Kafka / MapR-ESEvent streaming backbone
FrameworkSpring Cloud StreamBroker-agnostic streaming
ProcessingKafka StreamsKStream/KTable processing
LanguageJava 17+Application development

Spring Cloud Stream Fundamentals

Core Concepts

Event-Driven Architecture

Loading diagram...

Destination Binders: Abstraction responsible for providing integration with external messaging systems. This allows the same application code to work with Kafka, RabbitMQ, or other brokers.

Destination Bindings: Bridge between external message brokers and application-provided channels through the @EnableBinding annotation.

Message Channels:

  • Source (Producer): Sends messages to destinations
  • Sink (Consumer): Receives messages from destinations
  • Processor: Combines sink and source contracts

Programming Model

// Source - produces messages
public interface TweetSource {
    String OUTPUT = "tweets-out";

    @Output(OUTPUT)
    MessageChannel output();
}

// Sink - consumes messages
public interface TweetSink {
    String INPUT = "tweets-in";

    @Input(INPUT)
    SubscribableChannel input();
}

// Processor - consumes and produces
public interface TweetProcessor extends Source, Sink {
    // Inherits both input and output channels
}

Implementation Deep Dive

Tweet Domain Model

package com.gonnect.twitter.model;

import java.time.Instant;
import java.util.List;

public record Tweet(
    String id,
    String text,
    String userId,
    String username,
    List<String> hashtags,
    List<String> mentions,
    int retweetCount,
    int likeCount,
    int followerCount,
    String language,
    Instant createdAt
) {

    public boolean isRetweet() {
        return text.startsWith("RT @");
    }

    public boolean hasHashtags() {
        return hashtags != null && !hashtags.isEmpty();
    }
}

public record TrendingTopic(
    String hashtag,
    long count,
    long windowStart,
    long windowEnd
) {}

public record UserInfluence(
    String userId,
    String username,
    long totalEngagement,
    int tweetCount
) {}

Tweet Ingestion Source

package com.gonnect.twitter.source;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;

@SpringBootApplication
@EnableBinding(Source.class)
public class TweetSourceApplication {

    private final TwitterClient twitterClient;

    public TweetSourceApplication(TwitterClient twitterClient) {
        this.twitterClient = twitterClient;
    }

    public static void main(String[] args) {
        SpringApplication.run(TweetSourceApplication.class, args);
    }

    @InboundChannelAdapter(
        value = Source.OUTPUT,
        poller = @Poller(fixedDelay = "100", maxMessagesPerPoll = "10")
    )
    public Tweet fetchTweet() {
        return twitterClient.getNextTweet();
    }
}

Hashtag Trend Processor

package com.gonnect.twitter.processor;

import org.apache.kafka.streams.kstream.*;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.messaging.handler.annotation.SendTo;

import java.time.Duration;

@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class HashtagTrendProcessor {

    public static void main(String[] args) {
        SpringApplication.run(HashtagTrendProcessor.class, args);
    }

    @StreamListener("input")
    @SendTo("output")
    public KStream<String, TrendingTopic> processHashtags(
            KStream<String, Tweet> tweetsStream) {

        return tweetsStream
            // Filter tweets with hashtags
            .filter((key, tweet) -> tweet.hasHashtags())

            // Flatten hashtags - one record per hashtag
            .flatMapValues(tweet -> tweet.hashtags())

            // Re-key by hashtag for grouping
            .map((key, hashtag) -> new KeyValue<>(
                hashtag.toLowerCase(),
                hashtag
            ))

            // Group by hashtag
            .groupByKey(Grouped.with(
                Serdes.String(),
                Serdes.String()
            ))

            // Apply tumbling window of 5 minutes
            .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))

            // Count occurrences
            .count(Materialized.as("hashtag-counts-store"))

            // Convert to output stream
            .toStream()

            // Transform to TrendingTopic
            .map((windowedKey, count) -> new KeyValue<>(
                windowedKey.key(),
                new TrendingTopic(
                    windowedKey.key(),
                    count,
                    windowedKey.window().start(),
                    windowedKey.window().end()
                )
            ))

            // Filter to only significant trends (>10 occurrences)
            .filter((key, topic) -> topic.count() >= 10);
    }
}

Sentiment Analysis Processor

package com.gonnect.twitter.processor;

import org.apache.kafka.streams.kstream.*;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(KafkaStreamsProcessor.class)
public class SentimentAnalysisProcessor {

    private final SentimentAnalyzer sentimentAnalyzer;

    public SentimentAnalysisProcessor(SentimentAnalyzer sentimentAnalyzer) {
        this.sentimentAnalyzer = sentimentAnalyzer;
    }

    @StreamListener("input")
    @SendTo("output")
    public KStream<String, SentimentResult> analyzeSentiment(
            KStream<String, Tweet> tweetsStream) {

        return tweetsStream
            // Analyze sentiment for each tweet
            .mapValues(tweet -> {
                double score = sentimentAnalyzer.analyze(tweet.text());
                String sentiment = categorizeSentiment(score);

                return new SentimentResult(
                    tweet.id(),
                    tweet.text(),
                    score,
                    sentiment,
                    tweet.hashtags(),
                    tweet.createdAt()
                );
            })

            // Re-key by sentiment category for downstream aggregation
            .selectKey((key, result) -> result.sentiment());
    }

    private String categorizeSentiment(double score) {
        if (score > 0.3) return "positive";
        if (score < -0.3) return "negative";
        return "neutral";
    }
}

record SentimentResult(
    String tweetId,
    String text,
    double score,
    String sentiment,
    List<String> hashtags,
    Instant createdAt
) {}

User Influence Aggregator

package com.gonnect.twitter.processor;

import org.apache.kafka.streams.kstream.*;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(KafkaStreamsProcessor.class)
public class UserInfluenceAggregator {

    @StreamListener("input")
    @SendTo("output")
    public KStream<String, UserInfluence> aggregateInfluence(
            KStream<String, Tweet> tweetsStream) {

        return tweetsStream
            // Group by user
            .groupBy(
                (key, tweet) -> tweet.userId(),
                Grouped.with(Serdes.String(), new JsonSerde<>(Tweet.class))
            )

            // Aggregate user metrics
            .aggregate(
                // Initializer
                () -> new UserInfluenceAccumulator(),

                // Aggregator
                (userId, tweet, accumulator) -> {
                    accumulator.addTweet(tweet);
                    return accumulator;
                },

                // Materialized store
                Materialized.<String, UserInfluenceAccumulator, KeyValueStore<Bytes, byte[]>>
                    as("user-influence-store")
                    .withKeySerde(Serdes.String())
                    .withValueSerde(new JsonSerde<>(UserInfluenceAccumulator.class))
            )

            // Convert to stream
            .toStream()

            // Map to output format
            .mapValues(accumulator -> new UserInfluence(
                accumulator.getUserId(),
                accumulator.getUsername(),
                accumulator.getTotalEngagement(),
                accumulator.getTweetCount()
            ))

            // Filter to only influential users
            .filter((userId, influence) -> influence.totalEngagement() > 100);
    }
}

class UserInfluenceAccumulator {
    private String userId;
    private String username;
    private long totalEngagement = 0;
    private int tweetCount = 0;

    public void addTweet(Tweet tweet) {
        this.userId = tweet.userId();
        this.username = tweet.username();
        this.totalEngagement += tweet.retweetCount() + tweet.likeCount();
        this.tweetCount++;
    }

    // Getters...
    public String getUserId() { return userId; }
    public String getUsername() { return username; }
    public long getTotalEngagement() { return totalEngagement; }
    public int getTweetCount() { return tweetCount; }
}

Branching by Language

package com.gonnect.twitter.processor;

import org.apache.kafka.streams.kstream.*;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;

@EnableBinding(MultiOutputProcessor.class)
public class LanguageBranchingProcessor {

    @StreamListener("input")
    @SendTo({"englishOutput", "spanishOutput", "otherOutput"})
    @SuppressWarnings("unchecked")
    public KStream<String, Tweet>[] branchByLanguage(
            KStream<String, Tweet> tweetsStream) {

        // Define predicates for each language
        Predicate<String, Tweet> isEnglish = (key, tweet) ->
            "en".equals(tweet.language());

        Predicate<String, Tweet> isSpanish = (key, tweet) ->
            "es".equals(tweet.language());

        Predicate<String, Tweet> isOther = (key, tweet) ->
            !"en".equals(tweet.language()) && !"es".equals(tweet.language());

        // Branch the stream
        return tweetsStream.branch(isEnglish, isSpanish, isOther);
    }
}

interface MultiOutputProcessor {
    @Input("input")
    KStream<String, Tweet> input();

    @Output("englishOutput")
    KStream<String, Tweet> englishOutput();

    @Output("spanishOutput")
    KStream<String, Tweet> spanishOutput();

    @Output("otherOutput")
    KStream<String, Tweet> otherOutput();
}

Configuration

Application Properties

# application.yml
spring:
  application:
    name: twitter-trend-analyzer
  cloud:
    stream:
      bindings:
        input:
          destination: tweets
          group: trend-analyzer
          consumer:
            useNativeDecoding: true
        output:
          destination: trending-topics
          producer:
            useNativeEncoding: true

      kafka:
        streams:
          binder:
            brokers: localhost:9092
            applicationId: twitter-trend-analyzer
            configuration:
              commit.interval.ms: 1000
              default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

          bindings:
            input:
              consumer:
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: com.gonnect.twitter.serde.TweetSerde
            output:
              producer:
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: com.gonnect.twitter.serde.TrendingTopicSerde

          time-window:
            length: 300000  # 5 minutes in milliseconds
            advanceBy: 60000  # Advance by 1 minute (hopping window)

# Kafka consumer configuration
  kafka:
    consumer:
      auto-offset-reset: earliest
      enable-auto-commit: false

logging:
  level:
    org.apache.kafka.streams: INFO
    org.springframework.cloud.stream: DEBUG

MapR-ES Specific Configuration

# application-mapr.yml
spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            brokers: ${MAPR_KAFKA_BROKER:mapr-cluster:9092}
            configuration:
              # MapR-ES uses stream paths instead of topic names
              # Topics are prefixed with stream path: /mapr/stream:topic
              streams.topic.prefix: /mapr/twitter-stream:

Running the Application

Prerequisites

# Start Kafka (or MapR sandbox)
docker-compose up -d kafka zookeeper

# Create topics
kafka-topics.sh --create --topic tweets \
  --bootstrap-server localhost:9092 \
  --partitions 6 --replication-factor 1

kafka-topics.sh --create --topic trending-topics \
  --bootstrap-server localhost:9092 \
  --partitions 3 --replication-factor 1

Build and Run

# Build the application
./mvnw clean package -DskipTests

# Run with Spring profile
java -jar target/twitter-trend-analyzer.jar \
  --spring.profiles.active=local

# Or with MapR profile
java -jar target/twitter-trend-analyzer.jar \
  --spring.profiles.active=mapr

Monitoring

# Monitor input topic
kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic tweets \
  --from-beginning

# Monitor trending topics output
kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic trending-topics \
  --property print.key=true

Key Patterns and Best Practices

Consumer Groups and Partitioning

Event-Driven Architecture

Loading diagram...
ConceptDescriptionBenefit
Consumer GroupsCompeting consumers receive distinct messagesHorizontal scaling
PartitioningData distributed across instancesParallel processing
DurabilityPersistent subscriptions maintain stateFault tolerance

Stateful Processing Guidelines

// Always materialize state stores for fault tolerance
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>
    as("my-state-store")
    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.Long())
)

// Use proper window retention
.windowedBy(TimeWindows.of(Duration.ofMinutes(5))
    .grace(Duration.ofMinutes(1)))  // Allow late arrivals

// Configure changelog topics
spring.cloud.stream.kafka.streams.binder.configuration.replication.factor=3

Error Handling

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
    return factoryBean -> {
        factoryBean.setStreamsConfiguration(Map.of(
            StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            LogAndContinueExceptionHandler.class,

            StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
            DefaultProductionExceptionHandler.class
        ));
    };
}

Performance Optimization

OptimizationConfigurationImpact
Batch Sizebatch.size=16384Reduce network calls
Compressioncompression.type=lz4Lower bandwidth
Buffer Memorybuffer.memory=33554432Handle bursts
Commit Intervalcommit.interval.ms=100Lower latency
Cachingcache.max.bytes.buffering=10485760Reduce state store I/O

Conclusion

Building real-time Twitter trend analysis with Spring Cloud Stream and Kafka Streams provides:

  • Broker Agnosticism: Same code works with Kafka, MapR-ES, and other platforms
  • Developer Productivity: Spring Boot integration reduces boilerplate
  • High Performance: Kafka Streams provides scalable stateful processing
  • Fault Tolerance: Built-in exactly-once semantics and state recovery

The key insight is that Spring Cloud Stream abstracts messaging complexity while Kafka Streams provides powerful stateful operations, enabling developers to focus on business logic rather than infrastructure concerns.


Explore the complete implementation at SpringCloudKafkaStreamTwitterTrend on GitHub.