Drools + Apache NiFi: Building a Rules Engine Processor for Data Flow Automation

Learn how to integrate Drools Business Rules Management System with Apache NiFi to create powerful rule-based data flow processors. Enable declarative IF/THEN logic without embedding rules in application code.

GT
Gonnect Team
January 14, 202412 min readView on GitHub
DroolsApache NiFiJavaBRMS

Introduction

Modern data pipelines often require complex decision logic - routing, filtering, transformation, and validation based on business rules. Traditionally, this logic is embedded directly in application code, making it difficult to modify without developer intervention and deployments.

What if business analysts could define and modify rules without touching code? What if these rules could be seamlessly integrated into your data flows?

Enter the powerful combination of Drools (JBoss Rules) and Apache NiFi. Drools provides a robust Business Rules Management System (BRMS) with a RETE-based inference engine, while NiFi offers visual data flow automation with powerful extension capabilities.

Key Insight: By integrating Drools with NiFi, you can externalize business logic from your data pipelines, enabling business users to modify rules while data engineers focus on data movement and transformation.

Medallion Data Architecture

Loading diagram...

Understanding the Components

Drools (JBoss Rules)

Drools is a Business Rules Management System (BRMS) solution that provides:

FeatureDescription
RETE AlgorithmHighly efficient pattern matching algorithm
WorkbenchWeb-based authoring and management tools
IDE IntegrationEclipse and IntelliJ plugins for development
Decision TablesExcel-based rule definition for business users
Complex Event ProcessingTemporal reasoning and event correlation

Apache NiFi

Apache NiFi is a data automation platform designed for:

  • Data Flow Visualization: Drag-and-drop interface for building pipelines
  • Provenance Tracking: Complete data lineage and history
  • Processor Extensibility: Custom processors via NAR archives
  • Clustering: Horizontal scaling for enterprise workloads

Architecture Overview

The integration creates a custom NiFi processor that evaluates Drools rules against incoming FlowFiles:

+----------------+     +---------------------+     +----------------+
|   Data Source  |---->|   RuleEngineProcessor|---->|   Downstream   |
|   (JSON/XML)   |     |   (Drools Rules)    |     |   Processors   |
+----------------+     +---------------------+     +----------------+
                              |
                              v
                       +--------------+
                       |   .drl Files |
                       |   (Rules)    |
                       +--------------+

Medallion Data Architecture

Loading diagram...

Project Structure

The drools-nifi project consists of three primary modules:

drools-nifi/
├── drools-rules-reader/          # Rule interpretation component
│   ├── src/
│   │   └── main/java/
│   │       └── com/gonnect/rules/
│   │           ├── RuleEngine.java
│   │           └── RuleLoader.java
│   └── pom.xml
│
├── nifi-rules-processor/         # Core processor implementation
│   ├── src/
│   │   └── main/java/
│   │       └── com/gonnect/nifi/
│   │           └── RuleEngineProcessor.java
│   └── pom.xml
│
├── nifi-rules-processor-nar/     # NiFi NAR archive packaging
│   └── pom.xml
│
└── pom.xml                       # Parent POM

Building the Rules Reader

The rules reader module handles loading and compiling Drools rules.

Rule Loader

package com.gonnect.rules;

import org.kie.api.KieServices;
import org.kie.api.builder.KieBuilder;
import org.kie.api.builder.KieFileSystem;
import org.kie.api.builder.KieRepository;
import org.kie.api.runtime.KieContainer;
import org.kie.api.runtime.KieSession;
import org.kie.internal.io.ResourceFactory;

import java.io.File;
import java.util.Collection;

public class RuleLoader {

    private final KieServices kieServices;
    private KieContainer kieContainer;

    public RuleLoader() {
        this.kieServices = KieServices.Factory.get();
    }

    /**
     * Load rules from a directory containing .drl files
     */
    public void loadRules(String rulesDirectory) {
        KieFileSystem kieFileSystem = kieServices.newKieFileSystem();
        KieRepository kieRepository = kieServices.getRepository();

        File rulesDir = new File(rulesDirectory);
        if (!rulesDir.isDirectory()) {
            throw new IllegalArgumentException(
                "Rules path must be a directory: " + rulesDirectory);
        }

        // Load all .drl files from the directory
        File[] ruleFiles = rulesDir.listFiles(
            (dir, name) -> name.endsWith(".drl"));

        if (ruleFiles == null || ruleFiles.length == 0) {
            throw new IllegalStateException(
                "No .drl files found in: " + rulesDirectory);
        }

        for (File ruleFile : ruleFiles) {
            kieFileSystem.write(
                ResourceFactory.newFileResource(ruleFile));
        }

        KieBuilder kieBuilder = kieServices.newKieBuilder(kieFileSystem);
        kieBuilder.buildAll();

        if (kieBuilder.getResults().hasMessages(
                org.kie.api.builder.Message.Level.ERROR)) {
            throw new IllegalStateException(
                "Rule compilation errors: " +
                kieBuilder.getResults().getMessages());
        }

        this.kieContainer = kieServices.newKieContainer(
            kieRepository.getDefaultReleaseId());
    }

    /**
     * Create a new KieSession for rule execution
     */
    public KieSession createSession() {
        if (kieContainer == null) {
            throw new IllegalStateException(
                "Rules not loaded. Call loadRules() first.");
        }
        return kieContainer.newKieSession();
    }
}

Rule Engine Executor

package com.gonnect.rules;

import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.rule.FactHandle;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

public class RuleEngine {

    private final RuleLoader ruleLoader;

    public RuleEngine(String rulesDirectory) {
        this.ruleLoader = new RuleLoader();
        this.ruleLoader.loadRules(rulesDirectory);
    }

    /**
     * Execute rules against provided facts
     *
     * @param facts The objects to evaluate against rules
     * @return Map containing results and modified facts
     */
    public Map<String, Object> executeRules(Collection<Object> facts) {
        KieSession session = ruleLoader.createSession();
        Map<String, Object> results = new HashMap<>();

        try {
            // Insert all facts into the session
            for (Object fact : facts) {
                session.insert(fact);
            }

            // Fire all matching rules
            int rulesFired = session.fireAllRules();
            results.put("rulesFired", rulesFired);

            // Collect modified facts
            Collection<FactHandle> factHandles =
                session.getFactHandles();
            results.put("factCount", factHandles.size());

            // Extract global results if any
            Collection<String> globalNames = session.getGlobals().keySet();
            for (String globalName : globalNames) {
                results.put(globalName, session.getGlobal(globalName));
            }

        } finally {
            session.dispose();
        }

        return results;
    }
}

Creating the NiFi Processor

The core of the integration is the custom NiFi processor that bridges NiFi FlowFiles with Drools rule execution.

RuleEngineProcessor

package com.gonnect.nifi;

import com.gonnect.rules.RuleEngine;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.annotation.behavior.*;
import org.apache.nifi.annotation.documentation.*;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

@Tags({"drools", "rules", "engine", "decision", "brms"})
@CapabilityDescription(
    "Executes Drools business rules against incoming JSON FlowFiles. " +
    "Rules are loaded from a configured directory and evaluated against " +
    "the deserialized JSON content.")
@SeeAlso({})
@ReadsAttributes({
    @ReadsAttribute(attribute = "mime.type",
        description = "Expected to be application/json")
})
@WritesAttributes({
    @WritesAttribute(attribute = "rules.fired",
        description = "Number of rules that fired"),
    @WritesAttribute(attribute = "rules.evaluation.time",
        description = "Time taken to evaluate rules in milliseconds")
})
public class RuleEngineProcessor extends AbstractProcessor {

    public static final PropertyDescriptor RULES_DIRECTORY =
        new PropertyDescriptor.Builder()
            .name("rules-directory")
            .displayName("Rules Directory")
            .description("Directory containing Drools .drl rule files")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    public static final PropertyDescriptor FACT_CLASS =
        new PropertyDescriptor.Builder()
            .name("fact-class")
            .displayName("Fact Class Name")
            .description(
                "Fully qualified class name for JSON deserialization")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    public static final Relationship REL_SUCCESS =
        new Relationship.Builder()
            .name("success")
            .description("Successfully processed FlowFiles")
            .build();

    public static final Relationship REL_FAILURE =
        new Relationship.Builder()
            .name("failure")
            .description("FlowFiles that failed rule evaluation")
            .build();

    public static final Relationship REL_MATCHED =
        new Relationship.Builder()
            .name("matched")
            .description("FlowFiles where rules matched")
            .build();

    public static final Relationship REL_UNMATCHED =
        new Relationship.Builder()
            .name("unmatched")
            .description("FlowFiles where no rules matched")
            .build();

    private List<PropertyDescriptor> descriptors;
    private Set<Relationship> relationships;
    private final AtomicReference<RuleEngine> ruleEngineRef =
        new AtomicReference<>();
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    protected void init(final ProcessorInitializationContext context) {
        descriptors = Arrays.asList(RULES_DIRECTORY, FACT_CLASS);
        relationships = new HashSet<>(Arrays.asList(
            REL_SUCCESS, REL_FAILURE, REL_MATCHED, REL_UNMATCHED));
    }

    @Override
    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @Override
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @Override
    public void onTrigger(final ProcessContext context,
                          final ProcessSession session)
            throws ProcessException {

        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }

        // Initialize rule engine if needed
        RuleEngine ruleEngine = ruleEngineRef.get();
        if (ruleEngine == null) {
            String rulesDir = context.getProperty(RULES_DIRECTORY)
                .getValue();
            ruleEngine = new RuleEngine(rulesDir);
            ruleEngineRef.set(ruleEngine);
        }

        try {
            // Read FlowFile content
            String content = readFlowFileContent(session, flowFile);

            // Deserialize to fact object
            String factClassName = context.getProperty(FACT_CLASS)
                .getValue();
            Class<?> factClass = Class.forName(factClassName);
            Object fact = objectMapper.readValue(content, factClass);

            // Execute rules
            long startTime = System.currentTimeMillis();
            Map<String, Object> results = ruleEngine.executeRules(
                Collections.singletonList(fact));
            long duration = System.currentTimeMillis() - startTime;

            // Update FlowFile attributes
            Map<String, String> attributes = new HashMap<>();
            attributes.put("rules.fired",
                String.valueOf(results.get("rulesFired")));
            attributes.put("rules.evaluation.time",
                String.valueOf(duration));

            flowFile = session.putAllAttributes(flowFile, attributes);

            // Serialize modified fact back to FlowFile
            String outputContent = objectMapper.writeValueAsString(fact);
            flowFile = writeFlowFileContent(
                session, flowFile, outputContent);

            // Route based on rules fired
            int rulesFired = (int) results.get("rulesFired");
            if (rulesFired > 0) {
                session.transfer(flowFile, REL_MATCHED);
            } else {
                session.transfer(flowFile, REL_UNMATCHED);
            }

        } catch (Exception e) {
            getLogger().error("Rule evaluation failed", e);
            flowFile = session.putAttribute(
                flowFile, "rules.error", e.getMessage());
            session.transfer(flowFile, REL_FAILURE);
        }
    }

    private String readFlowFileContent(ProcessSession session,
                                       FlowFile flowFile) {
        final StringBuilder content = new StringBuilder();
        session.read(flowFile, inputStream -> {
            try (BufferedReader reader = new BufferedReader(
                    new InputStreamReader(inputStream))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    content.append(line);
                }
            }
        });
        return content.toString();
    }

    private FlowFile writeFlowFileContent(ProcessSession session,
                                          FlowFile flowFile,
                                          String content) {
        return session.write(flowFile, outputStream ->
            outputStream.write(content.getBytes()));
    }
}

Defining Business Rules

Rules are defined in .drl files using Drools Rule Language:

Example: Order Processing Rules

package com.gonnect.rules.orders

import com.gonnect.model.Order
import com.gonnect.model.OrderStatus

// Rule: High-value orders require approval
rule "High Value Order Requires Approval"
    when
        $order : Order(amount > 10000, status == OrderStatus.PENDING)
    then
        $order.setStatus(OrderStatus.PENDING_APPROVAL);
        $order.setApprovalRequired(true);
        System.out.println("Order " + $order.getId() +
            " requires approval (amount: " + $order.getAmount() + ")");
end

// Rule: Premium customers get expedited processing
rule "Premium Customer Expedited Processing"
    when
        $order : Order(customerTier == "PREMIUM",
                       status == OrderStatus.PENDING)
    then
        $order.setPriority("HIGH");
        $order.setExpedited(true);
        System.out.println("Order " + $order.getId() +
            " expedited for premium customer");
end

// Rule: Apply discount for bulk orders
rule "Bulk Order Discount"
    when
        $order : Order(quantity >= 100, discountApplied == false)
    then
        double discount = $order.getAmount() * 0.15;
        $order.setDiscount(discount);
        $order.setDiscountApplied(true);
        System.out.println("Applied 15% bulk discount: " + discount);
end

// Rule: Validate shipping address
rule "Shipping Address Validation"
    salience 100  // Higher priority
    when
        $order : Order(shippingAddress == null ||
                       shippingAddress.isEmpty())
    then
        $order.setStatus(OrderStatus.INVALID);
        $order.setValidationError("Shipping address is required");
end

Decision Table Format

For business user accessibility, rules can be defined in Excel decision tables:

RuleSetOrder Processing Rules
Importcom.gonnect.model.Order
Sequentialtrue
CONDITIONCONDITIONACTIONACTION
$order:Order$order:Order$order$order
amountcustomerTiersetStatussetPriority
Order AmountCustomer TierNew StatusPriority
>= 10000PENDING_APPROVAL
PREMIUMHIGH
GOLDMEDIUM

Building and Deploying

Build Process

# Clone the repository
git clone https://github.com/mgorav/drools-nifi.git
cd drools-nifi

# Build all modules
mvn clean install package -DskipTests

# The NAR file is created at:
# nifi-rules-processor-nar/target/nifi-rules-processor-nar-1.0.nar

NiFi Deployment

# Copy NAR to NiFi lib directory
cp nifi-rules-processor-nar/target/nifi-rules-processor-nar-1.0.nar \
   $NIFI_HOME/lib/

# Restart NiFi to load the new processor
$NIFI_HOME/bin/nifi.sh restart

# Verify processor is available
# Navigate to NiFi UI and search for "RuleEngineProcessor"

NiFi Flow Configuration

Creating the Data Flow

  1. Add RuleEngineProcessor to the canvas
  2. Configure Properties:
    • Rules Directory: /opt/nifi/rules/orders/
    • Fact Class Name: com.gonnect.model.Order
  3. Connect Relationships:
    • matched -> Further processing
    • unmatched -> Default handling
    • failure -> Error handling

Complete Flow Example

+--------------+     +-------------------+     +---------------+
| GetFile      |---->| RuleEngineProcessor|---->| PutKafka      |
| (JSON Input) |     | (Order Rules)     |     | (Matched)     |
+--------------+     +-------------------+     +---------------+
                            |
                            | unmatched
                            v
                     +---------------+
                     | LogAttribute  |
                     | (Audit Log)   |
                     +---------------+

Monitoring and Debugging

Rule Execution Metrics

The processor adds attributes to FlowFiles for monitoring:

AttributeDescription
rules.firedNumber of rules that triggered
rules.evaluation.timeProcessing time in milliseconds
rules.errorError message if evaluation failed

NiFi Bulletins

# Check processor bulletins for errors
# Available in NiFi UI under "Bulletin Board"

# Common issues:
# - Rule compilation errors
# - Class not found for fact deserialization
# - Invalid JSON input

Best Practices

Rule Design

// 1. Use salience for rule ordering
rule "Validation First"
    salience 1000
    when ...
    then ...
end

// 2. Use rule groups for organization
rule "Discount Rules - Bulk"
    agenda-group "discounts"
    when ...
    then ...
end

// 3. Avoid infinite loops with modify
rule "Update Status"
    when
        $order : Order(status == "NEW", processed == false)
    then
        modify($order) {
            setStatus("PROCESSING"),
            setProcessed(true)
        }
end

Performance Optimization

TechniqueBenefit
Rule IndexingFaster pattern matching
Stateless SessionsReduced memory for simple evaluations
Rule GroupsControl rule execution scope
Lazy LoadingInitialize rules on first use

Conclusion

The integration of Drools with Apache NiFi creates a powerful platform for rule-based data flow automation. This combination offers:

  • Separation of Concerns: Business rules externalized from data pipeline code
  • Visual Rule Management: Business analysts can understand and modify rules
  • Data Flow Visualization: NiFi's intuitive interface for pipeline design
  • Scalability: NiFi clustering with stateless Drools evaluation
  • Auditability: Complete data provenance and rule execution tracking

By implementing the RuleEngineProcessor, organizations can:

  1. Empower business users to define and modify rules
  2. Reduce deployment cycles for rule changes
  3. Maintain clear separation between data flow and business logic
  4. Leverage NiFi's robust data handling with Drools' powerful inference

The drools-nifi project provides a production-ready foundation for building intelligent, rule-driven data pipelines.


References