Embedded Kafka KSQL: Building Stream Processing Applications with In-Process KSQL
Learn how to embed KSQL stream processing capabilities directly within Java applications. Build real-time analytics, event processing, and stream transformations without a standalone KSQL server.
Table of Contents
Introduction
Stream processing has become the backbone of modern real-time analytics and event-driven architectures. While Apache Kafka provides the foundational event streaming platform, KSQL (now known as ksqlDB) adds a powerful SQL-like abstraction for stream processing without writing Java or Scala code.
But what if you want to embed KSQL capabilities directly within your Java application? What if you need stream processing without deploying a separate KSQL server?
This is where Embedded KSQL comes into play. By embedding KSQL within your application, you can leverage SQL-based stream processing while maintaining full control over deployment and integration with your existing systems.
Key Insight: Embedded KSQL enables developers to write stream processing logic using familiar SQL syntax while keeping everything within a single deployable artifact - perfect for microservices and cloud-native applications.
Medallion Data Architecture
Understanding KSQL Architecture
What is KSQL?
KSQL is a streaming SQL engine for Apache Kafka that provides:
| Feature | Description |
|---|---|
| Stream Processing | Continuous queries over Kafka topics |
| SQL Interface | Familiar SQL syntax for transformations |
| State Management | Built-in state stores for aggregations |
| Windowing | Time-based grouping of events |
| Joins | Stream-stream and stream-table joins |
Embedded vs Standalone KSQL
| Aspect | Standalone KSQL | Embedded KSQL |
|---|---|---|
| Deployment | Separate server cluster | Within your application |
| Scaling | Independent scaling | Scales with application |
| Management | Additional infrastructure | Single artifact |
| Use Case | Enterprise-wide platform | Application-specific processing |
Project Setup
Prerequisites
Ensure you have the following tools installed:
- Java 8 or higher
- Maven 3.x
- Docker and Docker Compose
Project Structure
kafka-ksql/
├── src/
│ └── main/
│ ├── java/
│ │ └── com/
│ │ └── gonnect/
│ │ └── ksql/
│ │ ├── EmbeddedKsqlQuerying.java
│ │ ├── config/
│ │ │ └── KsqlConfig.java
│ │ └── model/
│ │ └── Order.java
│ └── resources/
│ └── application.properties
├── docker-compose.yml
├── pom.xml
└── README.md
Maven Dependencies
<dependencies>
<!-- Kafka Clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
<!-- Kafka Streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.4.0</version>
</dependency>
<!-- KSQL Engine -->
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-engine</artifactId>
<version>0.28.2</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.2</version>
</dependency>
</dependencies>
Infrastructure Setup
Docker Compose Configuration
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: |
PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: |
PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
Starting the Infrastructure
# Start Kafka and related services
docker-compose up -d
# Wait for services to be ready
sleep 30
# Create the orders topic
docker exec kafka kafka-topics --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic orders_topic
# Verify topic creation
docker exec kafka kafka-topics --list \
--bootstrap-server localhost:9092
Domain Model
Order Event
package com.gonnect.ksql.model;
import com.fasterxml.jackson.annotation.JsonProperty;
public class Order {
@JsonProperty("orderId")
private String orderId;
@JsonProperty("customerId")
private String customerId;
@JsonProperty("product")
private String product;
@JsonProperty("quantity")
private int quantity;
@JsonProperty("price")
private double price;
@JsonProperty("timestamp")
private long timestamp;
@JsonProperty("region")
private String region;
// Default constructor for JSON deserialization
public Order() {}
public Order(String orderId, String customerId, String product,
int quantity, double price, String region) {
this.orderId = orderId;
this.customerId = customerId;
this.product = product;
this.quantity = quantity;
this.price = price;
this.region = region;
this.timestamp = System.currentTimeMillis();
}
// Getters and setters
public String getOrderId() { return orderId; }
public void setOrderId(String orderId) { this.orderId = orderId; }
public String getCustomerId() { return customerId; }
public void setCustomerId(String customerId) {
this.customerId = customerId;
}
public String getProduct() { return product; }
public void setProduct(String product) { this.product = product; }
public int getQuantity() { return quantity; }
public void setQuantity(int quantity) { this.quantity = quantity; }
public double getPrice() { return price; }
public void setPrice(double price) { this.price = price; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
public String getRegion() { return region; }
public void setRegion(String region) { this.region = region; }
public double getTotalAmount() {
return quantity * price;
}
@Override
public String toString() {
return String.format(
"Order{orderId='%s', customerId='%s', product='%s', " +
"quantity=%d, price=%.2f, region='%s'}",
orderId, customerId, product, quantity, price, region);
}
}
Embedded KSQL Implementation
KSQL Configuration
package com.gonnect.ksql.config;
import io.confluent.ksql.util.KsqlConfig;
import org.apache.kafka.streams.StreamsConfig;
import java.util.HashMap;
import java.util.Map;
public class KsqlConfiguration {
public static Map<String, Object> getKsqlConfig() {
Map<String, Object> config = new HashMap<>();
// Kafka connection
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
// Application ID
config.put(StreamsConfig.APPLICATION_ID_CONFIG,
"embedded-ksql-app");
// State store directory
config.put(StreamsConfig.STATE_DIR_CONFIG,
"/tmp/ksql-state");
// Processing guarantees
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// KSQL specific configuration
config.put(KsqlConfig.KSQL_SERVICE_ID_CONFIG,
"embedded_ksql_");
config.put(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG,
"query_");
// Schema registry (optional but recommended)
config.put("schema.registry.url",
"http://localhost:8081");
return config;
}
}
Embedded KSQL Querying
package com.gonnect.ksql;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class EmbeddedKsqlQuerying {
private final KsqlEngine ksqlEngine;
private final KsqlConfig ksqlConfig;
public EmbeddedKsqlQuerying(Map<String, Object> config) {
this.ksqlConfig = new KsqlConfig(config);
this.ksqlEngine = createKsqlEngine();
}
private KsqlEngine createKsqlEngine() {
// Initialize KSQL engine with configuration
ServiceContext serviceContext = ServiceContext.create(ksqlConfig);
MetaStore metaStore = new MetaStore();
FunctionRegistry functionRegistry =
FunctionRegistry.createDefault(ksqlConfig);
return new KsqlEngine(
serviceContext,
ksqlConfig,
metaStore,
functionRegistry
);
}
/**
* Create a KSQL stream from a Kafka topic
*/
public void createOrdersStream() {
String createStreamSql = """
CREATE STREAM orders (
orderId VARCHAR KEY,
customerId VARCHAR,
product VARCHAR,
quantity INT,
price DOUBLE,
timestamp BIGINT,
region VARCHAR
) WITH (
KAFKA_TOPIC = 'orders_topic',
VALUE_FORMAT = 'JSON',
TIMESTAMP = 'timestamp'
)
""";
executeStatement(createStreamSql);
System.out.println("Created 'orders' stream");
}
/**
* Create derived streams and tables using KSQL
*/
public void createDerivedStreams() {
// High-value orders stream
String highValueOrders = """
CREATE STREAM high_value_orders AS
SELECT
orderId,
customerId,
product,
quantity,
price,
(quantity * price) AS total_amount,
region
FROM orders
WHERE (quantity * price) > 1000
EMIT CHANGES
""";
executeStatement(highValueOrders);
System.out.println("Created 'high_value_orders' stream");
// Orders per region aggregation
String ordersPerRegion = """
CREATE TABLE orders_per_region AS
SELECT
region,
COUNT(*) AS order_count,
SUM(quantity * price) AS total_revenue
FROM orders
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY region
EMIT CHANGES
""";
executeStatement(ordersPerRegion);
System.out.println("Created 'orders_per_region' table");
}
/**
* Create real-time analytics queries
*/
public void createAnalyticsQueries() {
// Top products by revenue
String topProducts = """
CREATE TABLE top_products AS
SELECT
product,
COUNT(*) AS order_count,
SUM(quantity) AS total_quantity,
SUM(quantity * price) AS total_revenue,
AVG(price) AS avg_price
FROM orders
WINDOW TUMBLING (SIZE 15 MINUTES)
GROUP BY product
EMIT CHANGES
""";
executeStatement(topProducts);
System.out.println("Created 'top_products' analytics table");
// Customer spending patterns
String customerSpending = """
CREATE TABLE customer_spending AS
SELECT
customerId,
COUNT(*) AS total_orders,
SUM(quantity * price) AS lifetime_value,
MAX(quantity * price) AS largest_order
FROM orders
GROUP BY customerId
EMIT CHANGES
""";
executeStatement(customerSpending);
System.out.println("Created 'customer_spending' table");
}
/**
* Execute a KSQL statement
*/
private void executeStatement(String sql) {
List<ParsedStatement> statements =
ksqlEngine.parse(sql);
for (ParsedStatement statement : statements) {
ksqlEngine.execute(
serviceContext,
ConfiguredStatement.of(
statement,
ksqlConfig.overrideBreakingConfigsWithOriginalValues(
Collections.emptyMap())
)
);
}
}
/**
* List all running persistent queries
*/
public void listQueries() {
List<PersistentQueryMetadata> queries =
ksqlEngine.getPersistentQueries();
System.out.println("\nRunning KSQL Queries:");
System.out.println("-".repeat(50));
for (PersistentQueryMetadata query : queries) {
System.out.printf("Query ID: %s%n", query.getQueryId());
System.out.printf(" SQL: %s%n", query.getStatementString());
System.out.printf(" State: %s%n", query.getState());
System.out.println();
}
}
/**
* Gracefully shutdown the KSQL engine
*/
public void shutdown() {
System.out.println("Shutting down KSQL engine...");
ksqlEngine.close();
}
public static void main(String[] args) {
Map<String, Object> config = KsqlConfiguration.getKsqlConfig();
EmbeddedKsqlQuerying ksql = new EmbeddedKsqlQuerying(config);
try {
// Create the base orders stream
ksql.createOrdersStream();
// Create derived streams and tables
ksql.createDerivedStreams();
// Create analytics queries
ksql.createAnalyticsQueries();
// List all running queries
ksql.listQueries();
System.out.println("\nEmbedded KSQL is running.");
System.out.println("Press Ctrl+C to stop...");
// Keep the application running
Thread.currentThread().join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
ksql.shutdown();
}
}
}
Producing Test Data
Order Producer
package com.gonnect.ksql;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.gonnect.ksql.model.Order;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.*;
import java.util.concurrent.TimeUnit;
public class OrderProducer {
private static final String TOPIC = "orders_topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final ObjectMapper mapper = new ObjectMapper();
private static final String[] PRODUCTS = {
"Laptop", "Phone", "Tablet", "Headphones", "Camera",
"Watch", "Speaker", "Monitor", "Keyboard", "Mouse"
};
private static final String[] REGIONS = {
"NORTH", "SOUTH", "EAST", "WEST", "CENTRAL"
};
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
try (Producer<String, String> producer =
new KafkaProducer<>(props)) {
Random random = new Random();
for (int i = 0; i < 100; i++) {
Order order = new Order(
"ORD-" + UUID.randomUUID().toString().substring(0, 8),
"CUST-" + random.nextInt(1000),
PRODUCTS[random.nextInt(PRODUCTS.length)],
random.nextInt(10) + 1,
50 + random.nextDouble() * 950,
REGIONS[random.nextInt(REGIONS.length)]
);
String value = mapper.writeValueAsString(order);
ProducerRecord<String, String> record =
new ProducerRecord<>(TOPIC, order.getOrderId(), value);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Error: " + exception.getMessage());
} else {
System.out.printf(
"Sent order %s to partition %d offset %d%n",
order.getOrderId(),
metadata.partition(),
metadata.offset());
}
});
TimeUnit.MILLISECONDS.sleep(100);
}
}
System.out.println("Finished producing orders");
}
}
KSQL Query Examples
Stream Transformations
-- Filter orders by region
SELECT * FROM orders
WHERE region = 'NORTH'
EMIT CHANGES;
-- Calculate running totals
SELECT
customerId,
SUM(quantity * price) AS running_total
FROM orders
GROUP BY customerId
EMIT CHANGES;
-- Join orders with customer data
SELECT
o.orderId,
o.product,
c.customerName,
c.tier
FROM orders o
INNER JOIN customers c ON o.customerId = c.customerId
EMIT CHANGES;
Windowed Aggregations
-- Orders per minute
SELECT
region,
COUNT(*) AS orders_per_minute
FROM orders
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY region
EMIT CHANGES;
-- Moving average price per product
SELECT
product,
AVG(price) AS avg_price,
MIN(price) AS min_price,
MAX(price) AS max_price
FROM orders
WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 1 MINUTE)
GROUP BY product
EMIT CHANGES;
Complex Event Processing
-- Detect unusual order patterns
CREATE STREAM unusual_orders AS
SELECT *
FROM orders
WHERE quantity > 50
OR (quantity * price) > 5000
EMIT CHANGES;
-- Session-based customer activity
SELECT
customerId,
COUNT(*) AS session_orders,
SUM(quantity * price) AS session_value
FROM orders
WINDOW SESSION (30 MINUTES)
GROUP BY customerId
EMIT CHANGES;
Monitoring and Observability
Query Metrics
public void printQueryMetrics() {
for (PersistentQueryMetadata query : ksqlEngine.getPersistentQueries()) {
Map<String, Metric> metrics =
query.getStreamsProperties().metrics();
System.out.printf("Query: %s%n", query.getQueryId());
System.out.printf(" Messages processed: %s%n",
getMetricValue(metrics, "process-total"));
System.out.printf(" Processing rate: %s msg/s%n",
getMetricValue(metrics, "process-rate"));
System.out.printf(" State store size: %s%n",
getMetricValue(metrics, "state-store-size"));
}
}
JMX Metrics
# Enable JMX metrics
streams.metrics.recording.level=DEBUG
# Expose via JMX
com.sun.management.jmxremote=true
com.sun.management.jmxremote.port=9999
com.sun.management.jmxremote.authenticate=false
Best Practices
Query Design
| Practice | Description |
|---|---|
| Filter Early | Apply WHERE clauses before joins to reduce data volume |
| Use Appropriate Windows | Choose window size based on business requirements |
| Avoid Large State | Be mindful of GROUP BY cardinality |
| Key Selection | Choose effective partition keys for parallelism |
Performance Tuning
// Increase buffer sizes for high throughput
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
10 * 1024 * 1024L); // 10 MB
// Adjust commit interval
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
// Configure state store
config.put(StreamsConfig.STATE_DIR_CONFIG, "/fast-ssd/ksql-state");
Conclusion
Embedded KSQL provides a powerful approach to stream processing that combines:
- SQL Simplicity: Familiar SQL syntax for stream processing
- Deployment Flexibility: Single artifact without external KSQL server
- Full Integration: Direct access to Kafka Streams APIs when needed
- Real-Time Analytics: Continuous queries for instant insights
This approach is ideal for:
- Microservices requiring self-contained stream processing
- Applications with specific processing needs
- Environments where external KSQL server is impractical
- Use cases requiring tight integration with application code
The kafka-ksql project demonstrates that sophisticated stream processing can be achieved within your application, bringing the power of KSQL directly to your codebase.