Embedded Kafka KSQL: Building Stream Processing Applications with In-Process KSQL

Learn how to embed KSQL stream processing capabilities directly within Java applications. Build real-time analytics, event processing, and stream transformations without a standalone KSQL server.

GT
Gonnect Team
January 14, 202411 min readView on GitHub
Apache KafkaKSQLJavaStream Processing

Introduction

Stream processing has become the backbone of modern real-time analytics and event-driven architectures. While Apache Kafka provides the foundational event streaming platform, KSQL (now known as ksqlDB) adds a powerful SQL-like abstraction for stream processing without writing Java or Scala code.

But what if you want to embed KSQL capabilities directly within your Java application? What if you need stream processing without deploying a separate KSQL server?

This is where Embedded KSQL comes into play. By embedding KSQL within your application, you can leverage SQL-based stream processing while maintaining full control over deployment and integration with your existing systems.

Key Insight: Embedded KSQL enables developers to write stream processing logic using familiar SQL syntax while keeping everything within a single deployable artifact - perfect for microservices and cloud-native applications.

Medallion Data Architecture

Loading diagram...

Understanding KSQL Architecture

What is KSQL?

KSQL is a streaming SQL engine for Apache Kafka that provides:

FeatureDescription
Stream ProcessingContinuous queries over Kafka topics
SQL InterfaceFamiliar SQL syntax for transformations
State ManagementBuilt-in state stores for aggregations
WindowingTime-based grouping of events
JoinsStream-stream and stream-table joins

Embedded vs Standalone KSQL

AspectStandalone KSQLEmbedded KSQL
DeploymentSeparate server clusterWithin your application
ScalingIndependent scalingScales with application
ManagementAdditional infrastructureSingle artifact
Use CaseEnterprise-wide platformApplication-specific processing

Project Setup

Prerequisites

Ensure you have the following tools installed:

  • Java 8 or higher
  • Maven 3.x
  • Docker and Docker Compose

Project Structure

kafka-ksql/
├── src/
│   └── main/
│       ├── java/
│       │   └── com/
│       │       └── gonnect/
│       │           └── ksql/
│       │               ├── EmbeddedKsqlQuerying.java
│       │               ├── config/
│       │               │   └── KsqlConfig.java
│       │               └── model/
│       │                   └── Order.java
│       └── resources/
│           └── application.properties
├── docker-compose.yml
├── pom.xml
└── README.md

Maven Dependencies

<dependencies>
    <!-- Kafka Clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.4.0</version>
    </dependency>

    <!-- Kafka Streams -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>3.4.0</version>
    </dependency>

    <!-- KSQL Engine -->
    <dependency>
        <groupId>io.confluent.ksql</groupId>
        <artifactId>ksqldb-engine</artifactId>
        <version>0.28.2</version>
    </dependency>

    <!-- JSON Processing -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.14.2</version>
    </dependency>
</dependencies>

Infrastructure Setup

Docker Compose Configuration

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: |
        PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: |
        PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  schema-registry:
    image: confluentinc/cp-schema-registry:7.4.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092

Starting the Infrastructure

# Start Kafka and related services
docker-compose up -d

# Wait for services to be ready
sleep 30

# Create the orders topic
docker exec kafka kafka-topics --create \
  --bootstrap-server localhost:9092 \
  --replication-factor 1 \
  --partitions 1 \
  --topic orders_topic

# Verify topic creation
docker exec kafka kafka-topics --list \
  --bootstrap-server localhost:9092

Domain Model

Order Event

package com.gonnect.ksql.model;

import com.fasterxml.jackson.annotation.JsonProperty;

public class Order {

    @JsonProperty("orderId")
    private String orderId;

    @JsonProperty("customerId")
    private String customerId;

    @JsonProperty("product")
    private String product;

    @JsonProperty("quantity")
    private int quantity;

    @JsonProperty("price")
    private double price;

    @JsonProperty("timestamp")
    private long timestamp;

    @JsonProperty("region")
    private String region;

    // Default constructor for JSON deserialization
    public Order() {}

    public Order(String orderId, String customerId, String product,
                 int quantity, double price, String region) {
        this.orderId = orderId;
        this.customerId = customerId;
        this.product = product;
        this.quantity = quantity;
        this.price = price;
        this.region = region;
        this.timestamp = System.currentTimeMillis();
    }

    // Getters and setters
    public String getOrderId() { return orderId; }
    public void setOrderId(String orderId) { this.orderId = orderId; }

    public String getCustomerId() { return customerId; }
    public void setCustomerId(String customerId) {
        this.customerId = customerId;
    }

    public String getProduct() { return product; }
    public void setProduct(String product) { this.product = product; }

    public int getQuantity() { return quantity; }
    public void setQuantity(int quantity) { this.quantity = quantity; }

    public double getPrice() { return price; }
    public void setPrice(double price) { this.price = price; }

    public long getTimestamp() { return timestamp; }
    public void setTimestamp(long timestamp) { this.timestamp = timestamp; }

    public String getRegion() { return region; }
    public void setRegion(String region) { this.region = region; }

    public double getTotalAmount() {
        return quantity * price;
    }

    @Override
    public String toString() {
        return String.format(
            "Order{orderId='%s', customerId='%s', product='%s', " +
            "quantity=%d, price=%.2f, region='%s'}",
            orderId, customerId, product, quantity, price, region);
    }
}

Embedded KSQL Implementation

KSQL Configuration

package com.gonnect.ksql.config;

import io.confluent.ksql.util.KsqlConfig;
import org.apache.kafka.streams.StreamsConfig;

import java.util.HashMap;
import java.util.Map;

public class KsqlConfiguration {

    public static Map<String, Object> getKsqlConfig() {
        Map<String, Object> config = new HashMap<>();

        // Kafka connection
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
            "localhost:9092");

        // Application ID
        config.put(StreamsConfig.APPLICATION_ID_CONFIG,
            "embedded-ksql-app");

        // State store directory
        config.put(StreamsConfig.STATE_DIR_CONFIG,
            "/tmp/ksql-state");

        // Processing guarantees
        config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
            StreamsConfig.EXACTLY_ONCE_V2);

        // KSQL specific configuration
        config.put(KsqlConfig.KSQL_SERVICE_ID_CONFIG,
            "embedded_ksql_");

        config.put(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG,
            "query_");

        // Schema registry (optional but recommended)
        config.put("schema.registry.url",
            "http://localhost:8081");

        return config;
    }
}

Embedded KSQL Querying

package com.gonnect.ksql;

import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PersistentQueryMetadata;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public class EmbeddedKsqlQuerying {

    private final KsqlEngine ksqlEngine;
    private final KsqlConfig ksqlConfig;

    public EmbeddedKsqlQuerying(Map<String, Object> config) {
        this.ksqlConfig = new KsqlConfig(config);
        this.ksqlEngine = createKsqlEngine();
    }

    private KsqlEngine createKsqlEngine() {
        // Initialize KSQL engine with configuration
        ServiceContext serviceContext = ServiceContext.create(ksqlConfig);
        MetaStore metaStore = new MetaStore();
        FunctionRegistry functionRegistry =
            FunctionRegistry.createDefault(ksqlConfig);

        return new KsqlEngine(
            serviceContext,
            ksqlConfig,
            metaStore,
            functionRegistry
        );
    }

    /**
     * Create a KSQL stream from a Kafka topic
     */
    public void createOrdersStream() {
        String createStreamSql = """
            CREATE STREAM orders (
                orderId VARCHAR KEY,
                customerId VARCHAR,
                product VARCHAR,
                quantity INT,
                price DOUBLE,
                timestamp BIGINT,
                region VARCHAR
            ) WITH (
                KAFKA_TOPIC = 'orders_topic',
                VALUE_FORMAT = 'JSON',
                TIMESTAMP = 'timestamp'
            )
            """;

        executeStatement(createStreamSql);
        System.out.println("Created 'orders' stream");
    }

    /**
     * Create derived streams and tables using KSQL
     */
    public void createDerivedStreams() {
        // High-value orders stream
        String highValueOrders = """
            CREATE STREAM high_value_orders AS
            SELECT
                orderId,
                customerId,
                product,
                quantity,
                price,
                (quantity * price) AS total_amount,
                region
            FROM orders
            WHERE (quantity * price) > 1000
            EMIT CHANGES
            """;

        executeStatement(highValueOrders);
        System.out.println("Created 'high_value_orders' stream");

        // Orders per region aggregation
        String ordersPerRegion = """
            CREATE TABLE orders_per_region AS
            SELECT
                region,
                COUNT(*) AS order_count,
                SUM(quantity * price) AS total_revenue
            FROM orders
            WINDOW TUMBLING (SIZE 1 HOUR)
            GROUP BY region
            EMIT CHANGES
            """;

        executeStatement(ordersPerRegion);
        System.out.println("Created 'orders_per_region' table");
    }

    /**
     * Create real-time analytics queries
     */
    public void createAnalyticsQueries() {
        // Top products by revenue
        String topProducts = """
            CREATE TABLE top_products AS
            SELECT
                product,
                COUNT(*) AS order_count,
                SUM(quantity) AS total_quantity,
                SUM(quantity * price) AS total_revenue,
                AVG(price) AS avg_price
            FROM orders
            WINDOW TUMBLING (SIZE 15 MINUTES)
            GROUP BY product
            EMIT CHANGES
            """;

        executeStatement(topProducts);
        System.out.println("Created 'top_products' analytics table");

        // Customer spending patterns
        String customerSpending = """
            CREATE TABLE customer_spending AS
            SELECT
                customerId,
                COUNT(*) AS total_orders,
                SUM(quantity * price) AS lifetime_value,
                MAX(quantity * price) AS largest_order
            FROM orders
            GROUP BY customerId
            EMIT CHANGES
            """;

        executeStatement(customerSpending);
        System.out.println("Created 'customer_spending' table");
    }

    /**
     * Execute a KSQL statement
     */
    private void executeStatement(String sql) {
        List<ParsedStatement> statements =
            ksqlEngine.parse(sql);

        for (ParsedStatement statement : statements) {
            ksqlEngine.execute(
                serviceContext,
                ConfiguredStatement.of(
                    statement,
                    ksqlConfig.overrideBreakingConfigsWithOriginalValues(
                        Collections.emptyMap())
                )
            );
        }
    }

    /**
     * List all running persistent queries
     */
    public void listQueries() {
        List<PersistentQueryMetadata> queries =
            ksqlEngine.getPersistentQueries();

        System.out.println("\nRunning KSQL Queries:");
        System.out.println("-".repeat(50));

        for (PersistentQueryMetadata query : queries) {
            System.out.printf("Query ID: %s%n", query.getQueryId());
            System.out.printf("  SQL: %s%n", query.getStatementString());
            System.out.printf("  State: %s%n", query.getState());
            System.out.println();
        }
    }

    /**
     * Gracefully shutdown the KSQL engine
     */
    public void shutdown() {
        System.out.println("Shutting down KSQL engine...");
        ksqlEngine.close();
    }

    public static void main(String[] args) {
        Map<String, Object> config = KsqlConfiguration.getKsqlConfig();
        EmbeddedKsqlQuerying ksql = new EmbeddedKsqlQuerying(config);

        try {
            // Create the base orders stream
            ksql.createOrdersStream();

            // Create derived streams and tables
            ksql.createDerivedStreams();

            // Create analytics queries
            ksql.createAnalyticsQueries();

            // List all running queries
            ksql.listQueries();

            System.out.println("\nEmbedded KSQL is running.");
            System.out.println("Press Ctrl+C to stop...");

            // Keep the application running
            Thread.currentThread().join();

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            ksql.shutdown();
        }
    }
}

Producing Test Data

Order Producer

package com.gonnect.ksql;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.gonnect.ksql.model.Order;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.*;
import java.util.concurrent.TimeUnit;

public class OrderProducer {

    private static final String TOPIC = "orders_topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final ObjectMapper mapper = new ObjectMapper();

    private static final String[] PRODUCTS = {
        "Laptop", "Phone", "Tablet", "Headphones", "Camera",
        "Watch", "Speaker", "Monitor", "Keyboard", "Mouse"
    };

    private static final String[] REGIONS = {
        "NORTH", "SOUTH", "EAST", "WEST", "CENTRAL"
    };

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG, "all");

        try (Producer<String, String> producer =
                 new KafkaProducer<>(props)) {

            Random random = new Random();

            for (int i = 0; i < 100; i++) {
                Order order = new Order(
                    "ORD-" + UUID.randomUUID().toString().substring(0, 8),
                    "CUST-" + random.nextInt(1000),
                    PRODUCTS[random.nextInt(PRODUCTS.length)],
                    random.nextInt(10) + 1,
                    50 + random.nextDouble() * 950,
                    REGIONS[random.nextInt(REGIONS.length)]
                );

                String value = mapper.writeValueAsString(order);

                ProducerRecord<String, String> record =
                    new ProducerRecord<>(TOPIC, order.getOrderId(), value);

                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        System.err.println("Error: " + exception.getMessage());
                    } else {
                        System.out.printf(
                            "Sent order %s to partition %d offset %d%n",
                            order.getOrderId(),
                            metadata.partition(),
                            metadata.offset());
                    }
                });

                TimeUnit.MILLISECONDS.sleep(100);
            }
        }

        System.out.println("Finished producing orders");
    }
}

KSQL Query Examples

Stream Transformations

-- Filter orders by region
SELECT * FROM orders
WHERE region = 'NORTH'
EMIT CHANGES;

-- Calculate running totals
SELECT
    customerId,
    SUM(quantity * price) AS running_total
FROM orders
GROUP BY customerId
EMIT CHANGES;

-- Join orders with customer data
SELECT
    o.orderId,
    o.product,
    c.customerName,
    c.tier
FROM orders o
INNER JOIN customers c ON o.customerId = c.customerId
EMIT CHANGES;

Windowed Aggregations

-- Orders per minute
SELECT
    region,
    COUNT(*) AS orders_per_minute
FROM orders
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY region
EMIT CHANGES;

-- Moving average price per product
SELECT
    product,
    AVG(price) AS avg_price,
    MIN(price) AS min_price,
    MAX(price) AS max_price
FROM orders
WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 1 MINUTE)
GROUP BY product
EMIT CHANGES;

Complex Event Processing

-- Detect unusual order patterns
CREATE STREAM unusual_orders AS
SELECT *
FROM orders
WHERE quantity > 50
   OR (quantity * price) > 5000
EMIT CHANGES;

-- Session-based customer activity
SELECT
    customerId,
    COUNT(*) AS session_orders,
    SUM(quantity * price) AS session_value
FROM orders
WINDOW SESSION (30 MINUTES)
GROUP BY customerId
EMIT CHANGES;

Monitoring and Observability

Query Metrics

public void printQueryMetrics() {
    for (PersistentQueryMetadata query : ksqlEngine.getPersistentQueries()) {
        Map<String, Metric> metrics =
            query.getStreamsProperties().metrics();

        System.out.printf("Query: %s%n", query.getQueryId());
        System.out.printf("  Messages processed: %s%n",
            getMetricValue(metrics, "process-total"));
        System.out.printf("  Processing rate: %s msg/s%n",
            getMetricValue(metrics, "process-rate"));
        System.out.printf("  State store size: %s%n",
            getMetricValue(metrics, "state-store-size"));
    }
}

JMX Metrics

# Enable JMX metrics
streams.metrics.recording.level=DEBUG

# Expose via JMX
com.sun.management.jmxremote=true
com.sun.management.jmxremote.port=9999
com.sun.management.jmxremote.authenticate=false

Best Practices

Query Design

PracticeDescription
Filter EarlyApply WHERE clauses before joins to reduce data volume
Use Appropriate WindowsChoose window size based on business requirements
Avoid Large StateBe mindful of GROUP BY cardinality
Key SelectionChoose effective partition keys for parallelism

Performance Tuning

// Increase buffer sizes for high throughput
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
    10 * 1024 * 1024L); // 10 MB

// Adjust commit interval
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);

// Configure state store
config.put(StreamsConfig.STATE_DIR_CONFIG, "/fast-ssd/ksql-state");

Conclusion

Embedded KSQL provides a powerful approach to stream processing that combines:

  • SQL Simplicity: Familiar SQL syntax for stream processing
  • Deployment Flexibility: Single artifact without external KSQL server
  • Full Integration: Direct access to Kafka Streams APIs when needed
  • Real-Time Analytics: Continuous queries for instant insights

This approach is ideal for:

  1. Microservices requiring self-contained stream processing
  2. Applications with specific processing needs
  3. Environments where external KSQL server is impractical
  4. Use cases requiring tight integration with application code

The kafka-ksql project demonstrates that sophisticated stream processing can be achieved within your application, bringing the power of KSQL directly to your codebase.


References