Real-Time Twitter Trend Analysis with Spring Cloud Stream and Kafka
Build a scalable event streaming pipeline for Twitter trend analysis using Spring Cloud Stream, Kafka Streams, and MapR-ES. Master KStream, KTable, and processor patterns for real-time analytics.
Table of Contents
Introduction
Social media platforms generate massive volumes of real-time data that hold valuable insights - trending topics, sentiment shifts, and emerging conversations. Capturing and analyzing this data at scale requires a robust event streaming architecture that can handle millions of events per second.
This project demonstrates building a Twitter trend analysis pipeline using Spring Cloud Stream with Apache Kafka (and MapR-ES). By leveraging the abstractions provided by Spring Cloud Stream, we create a broker-agnostic streaming application that can run on various messaging platforms while maintaining high throughput and low latency.
Key Insight: Spring Cloud Stream + Kafka Streams = Developer Productivity + High Performance + Scalability. The combination provides the best of both worlds - Spring's ease of use with Kafka's streaming capabilities.
Architecture Overview
Event-Driven Architecture
System Components
┌─────────────────────────────────────────────────────────────────────┐
│ TWITTER TREND ANALYSIS PIPELINE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Twitter │ │ Twitter │ │ Twitter │ │
│ │ Stream 1 │ │ Stream 2 │ │ Stream N │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └───────────────────┼───────────────────┘ │
│ │ │
│ ┌────────▼────────┐ │
│ │ Kafka/MapR-ES │ │
│ │ (tweets topic) │ │
│ └────────┬────────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ │ │ │ │
│ ┌──────▼──────┐ ┌───────▼───────┐ ┌──────▼──────┐ │
│ │ Hashtag │ │ Sentiment │ │ User │ │
│ │ Processor │ │ Analyzer │ │ Influence │ │
│ └──────┬──────┘ └───────┬───────┘ └──────┬──────┘ │
│ │ │ │ │
│ ┌──────▼──────┐ ┌───────▼───────┐ ┌──────▼──────┐ │
│ │ Trending │ │ Sentiment │ │ Influencer │ │
│ │ Topics │ │ Dashboard │ │ Rankings │ │
│ └─────────────┘ └───────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Technology Stack
| Component | Technology | Purpose |
|---|---|---|
| Messaging | Apache Kafka / MapR-ES | Event streaming backbone |
| Framework | Spring Cloud Stream | Broker-agnostic streaming |
| Processing | Kafka Streams | KStream/KTable processing |
| Language | Java 17+ | Application development |
Spring Cloud Stream Fundamentals
Core Concepts
Event-Driven Architecture
Destination Binders: Abstraction responsible for providing integration with external messaging systems. This allows the same application code to work with Kafka, RabbitMQ, or other brokers.
Destination Bindings: Bridge between external message brokers and application-provided channels through the @EnableBinding annotation.
Message Channels:
- Source (Producer): Sends messages to destinations
- Sink (Consumer): Receives messages from destinations
- Processor: Combines sink and source contracts
Programming Model
// Source - produces messages
public interface TweetSource {
String OUTPUT = "tweets-out";
@Output(OUTPUT)
MessageChannel output();
}
// Sink - consumes messages
public interface TweetSink {
String INPUT = "tweets-in";
@Input(INPUT)
SubscribableChannel input();
}
// Processor - consumes and produces
public interface TweetProcessor extends Source, Sink {
// Inherits both input and output channels
}
Implementation Deep Dive
Tweet Domain Model
package com.gonnect.twitter.model;
import java.time.Instant;
import java.util.List;
public record Tweet(
String id,
String text,
String userId,
String username,
List<String> hashtags,
List<String> mentions,
int retweetCount,
int likeCount,
int followerCount,
String language,
Instant createdAt
) {
public boolean isRetweet() {
return text.startsWith("RT @");
}
public boolean hasHashtags() {
return hashtags != null && !hashtags.isEmpty();
}
}
public record TrendingTopic(
String hashtag,
long count,
long windowStart,
long windowEnd
) {}
public record UserInfluence(
String userId,
String username,
long totalEngagement,
int tweetCount
) {}
Tweet Ingestion Source
package com.gonnect.twitter.source;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
@SpringBootApplication
@EnableBinding(Source.class)
public class TweetSourceApplication {
private final TwitterClient twitterClient;
public TweetSourceApplication(TwitterClient twitterClient) {
this.twitterClient = twitterClient;
}
public static void main(String[] args) {
SpringApplication.run(TweetSourceApplication.class, args);
}
@InboundChannelAdapter(
value = Source.OUTPUT,
poller = @Poller(fixedDelay = "100", maxMessagesPerPoll = "10")
)
public Tweet fetchTweet() {
return twitterClient.getNextTweet();
}
}
Hashtag Trend Processor
package com.gonnect.twitter.processor;
import org.apache.kafka.streams.kstream.*;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.messaging.handler.annotation.SendTo;
import java.time.Duration;
@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class HashtagTrendProcessor {
public static void main(String[] args) {
SpringApplication.run(HashtagTrendProcessor.class, args);
}
@StreamListener("input")
@SendTo("output")
public KStream<String, TrendingTopic> processHashtags(
KStream<String, Tweet> tweetsStream) {
return tweetsStream
// Filter tweets with hashtags
.filter((key, tweet) -> tweet.hasHashtags())
// Flatten hashtags - one record per hashtag
.flatMapValues(tweet -> tweet.hashtags())
// Re-key by hashtag for grouping
.map((key, hashtag) -> new KeyValue<>(
hashtag.toLowerCase(),
hashtag
))
// Group by hashtag
.groupByKey(Grouped.with(
Serdes.String(),
Serdes.String()
))
// Apply tumbling window of 5 minutes
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
// Count occurrences
.count(Materialized.as("hashtag-counts-store"))
// Convert to output stream
.toStream()
// Transform to TrendingTopic
.map((windowedKey, count) -> new KeyValue<>(
windowedKey.key(),
new TrendingTopic(
windowedKey.key(),
count,
windowedKey.window().start(),
windowedKey.window().end()
)
))
// Filter to only significant trends (>10 occurrences)
.filter((key, topic) -> topic.count() >= 10);
}
}
Sentiment Analysis Processor
package com.gonnect.twitter.processor;
import org.apache.kafka.streams.kstream.*;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(KafkaStreamsProcessor.class)
public class SentimentAnalysisProcessor {
private final SentimentAnalyzer sentimentAnalyzer;
public SentimentAnalysisProcessor(SentimentAnalyzer sentimentAnalyzer) {
this.sentimentAnalyzer = sentimentAnalyzer;
}
@StreamListener("input")
@SendTo("output")
public KStream<String, SentimentResult> analyzeSentiment(
KStream<String, Tweet> tweetsStream) {
return tweetsStream
// Analyze sentiment for each tweet
.mapValues(tweet -> {
double score = sentimentAnalyzer.analyze(tweet.text());
String sentiment = categorizeSentiment(score);
return new SentimentResult(
tweet.id(),
tweet.text(),
score,
sentiment,
tweet.hashtags(),
tweet.createdAt()
);
})
// Re-key by sentiment category for downstream aggregation
.selectKey((key, result) -> result.sentiment());
}
private String categorizeSentiment(double score) {
if (score > 0.3) return "positive";
if (score < -0.3) return "negative";
return "neutral";
}
}
record SentimentResult(
String tweetId,
String text,
double score,
String sentiment,
List<String> hashtags,
Instant createdAt
) {}
User Influence Aggregator
package com.gonnect.twitter.processor;
import org.apache.kafka.streams.kstream.*;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(KafkaStreamsProcessor.class)
public class UserInfluenceAggregator {
@StreamListener("input")
@SendTo("output")
public KStream<String, UserInfluence> aggregateInfluence(
KStream<String, Tweet> tweetsStream) {
return tweetsStream
// Group by user
.groupBy(
(key, tweet) -> tweet.userId(),
Grouped.with(Serdes.String(), new JsonSerde<>(Tweet.class))
)
// Aggregate user metrics
.aggregate(
// Initializer
() -> new UserInfluenceAccumulator(),
// Aggregator
(userId, tweet, accumulator) -> {
accumulator.addTweet(tweet);
return accumulator;
},
// Materialized store
Materialized.<String, UserInfluenceAccumulator, KeyValueStore<Bytes, byte[]>>
as("user-influence-store")
.withKeySerde(Serdes.String())
.withValueSerde(new JsonSerde<>(UserInfluenceAccumulator.class))
)
// Convert to stream
.toStream()
// Map to output format
.mapValues(accumulator -> new UserInfluence(
accumulator.getUserId(),
accumulator.getUsername(),
accumulator.getTotalEngagement(),
accumulator.getTweetCount()
))
// Filter to only influential users
.filter((userId, influence) -> influence.totalEngagement() > 100);
}
}
class UserInfluenceAccumulator {
private String userId;
private String username;
private long totalEngagement = 0;
private int tweetCount = 0;
public void addTweet(Tweet tweet) {
this.userId = tweet.userId();
this.username = tweet.username();
this.totalEngagement += tweet.retweetCount() + tweet.likeCount();
this.tweetCount++;
}
// Getters...
public String getUserId() { return userId; }
public String getUsername() { return username; }
public long getTotalEngagement() { return totalEngagement; }
public int getTweetCount() { return tweetCount; }
}
Branching by Language
package com.gonnect.twitter.processor;
import org.apache.kafka.streams.kstream.*;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
@EnableBinding(MultiOutputProcessor.class)
public class LanguageBranchingProcessor {
@StreamListener("input")
@SendTo({"englishOutput", "spanishOutput", "otherOutput"})
@SuppressWarnings("unchecked")
public KStream<String, Tweet>[] branchByLanguage(
KStream<String, Tweet> tweetsStream) {
// Define predicates for each language
Predicate<String, Tweet> isEnglish = (key, tweet) ->
"en".equals(tweet.language());
Predicate<String, Tweet> isSpanish = (key, tweet) ->
"es".equals(tweet.language());
Predicate<String, Tweet> isOther = (key, tweet) ->
!"en".equals(tweet.language()) && !"es".equals(tweet.language());
// Branch the stream
return tweetsStream.branch(isEnglish, isSpanish, isOther);
}
}
interface MultiOutputProcessor {
@Input("input")
KStream<String, Tweet> input();
@Output("englishOutput")
KStream<String, Tweet> englishOutput();
@Output("spanishOutput")
KStream<String, Tweet> spanishOutput();
@Output("otherOutput")
KStream<String, Tweet> otherOutput();
}
Configuration
Application Properties
# application.yml
spring:
application:
name: twitter-trend-analyzer
cloud:
stream:
bindings:
input:
destination: tweets
group: trend-analyzer
consumer:
useNativeDecoding: true
output:
destination: trending-topics
producer:
useNativeEncoding: true
kafka:
streams:
binder:
brokers: localhost:9092
applicationId: twitter-trend-analyzer
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
input:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: com.gonnect.twitter.serde.TweetSerde
output:
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: com.gonnect.twitter.serde.TrendingTopicSerde
time-window:
length: 300000 # 5 minutes in milliseconds
advanceBy: 60000 # Advance by 1 minute (hopping window)
# Kafka consumer configuration
kafka:
consumer:
auto-offset-reset: earliest
enable-auto-commit: false
logging:
level:
org.apache.kafka.streams: INFO
org.springframework.cloud.stream: DEBUG
MapR-ES Specific Configuration
# application-mapr.yml
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: ${MAPR_KAFKA_BROKER:mapr-cluster:9092}
configuration:
# MapR-ES uses stream paths instead of topic names
# Topics are prefixed with stream path: /mapr/stream:topic
streams.topic.prefix: /mapr/twitter-stream:
Running the Application
Prerequisites
# Start Kafka (or MapR sandbox)
docker-compose up -d kafka zookeeper
# Create topics
kafka-topics.sh --create --topic tweets \
--bootstrap-server localhost:9092 \
--partitions 6 --replication-factor 1
kafka-topics.sh --create --topic trending-topics \
--bootstrap-server localhost:9092 \
--partitions 3 --replication-factor 1
Build and Run
# Build the application
./mvnw clean package -DskipTests
# Run with Spring profile
java -jar target/twitter-trend-analyzer.jar \
--spring.profiles.active=local
# Or with MapR profile
java -jar target/twitter-trend-analyzer.jar \
--spring.profiles.active=mapr
Monitoring
# Monitor input topic
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic tweets \
--from-beginning
# Monitor trending topics output
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic trending-topics \
--property print.key=true
Key Patterns and Best Practices
Consumer Groups and Partitioning
Event-Driven Architecture
| Concept | Description | Benefit |
|---|---|---|
| Consumer Groups | Competing consumers receive distinct messages | Horizontal scaling |
| Partitioning | Data distributed across instances | Parallel processing |
| Durability | Persistent subscriptions maintain state | Fault tolerance |
Stateful Processing Guidelines
// Always materialize state stores for fault tolerance
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>
as("my-state-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
)
// Use proper window retention
.windowedBy(TimeWindows.of(Duration.ofMinutes(5))
.grace(Duration.ofMinutes(1))) // Allow late arrivals
// Configure changelog topics
spring.cloud.stream.kafka.streams.binder.configuration.replication.factor=3
Error Handling
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return factoryBean -> {
factoryBean.setStreamsConfiguration(Map.of(
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class,
StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
DefaultProductionExceptionHandler.class
));
};
}
Performance Optimization
| Optimization | Configuration | Impact |
|---|---|---|
| Batch Size | batch.size=16384 | Reduce network calls |
| Compression | compression.type=lz4 | Lower bandwidth |
| Buffer Memory | buffer.memory=33554432 | Handle bursts |
| Commit Interval | commit.interval.ms=100 | Lower latency |
| Caching | cache.max.bytes.buffering=10485760 | Reduce state store I/O |
Conclusion
Building real-time Twitter trend analysis with Spring Cloud Stream and Kafka Streams provides:
- Broker Agnosticism: Same code works with Kafka, MapR-ES, and other platforms
- Developer Productivity: Spring Boot integration reduces boilerplate
- High Performance: Kafka Streams provides scalable stateful processing
- Fault Tolerance: Built-in exactly-once semantics and state recovery
The key insight is that Spring Cloud Stream abstracts messaging complexity while Kafka Streams provides powerful stateful operations, enabling developers to focus on business logic rather than infrastructure concerns.
Explore the complete implementation at SpringCloudKafkaStreamTwitterTrend on GitHub.