Drools + Apache NiFi: Building a Rules Engine Processor for Data Flow Automation
Learn how to integrate Drools Business Rules Management System with Apache NiFi to create powerful rule-based data flow processors. Enable declarative IF/THEN logic without embedding rules in application code.
Table of Contents
Introduction
Modern data pipelines often require complex decision logic - routing, filtering, transformation, and validation based on business rules. Traditionally, this logic is embedded directly in application code, making it difficult to modify without developer intervention and deployments.
What if business analysts could define and modify rules without touching code? What if these rules could be seamlessly integrated into your data flows?
Enter the powerful combination of Drools (JBoss Rules) and Apache NiFi. Drools provides a robust Business Rules Management System (BRMS) with a RETE-based inference engine, while NiFi offers visual data flow automation with powerful extension capabilities.
Key Insight: By integrating Drools with NiFi, you can externalize business logic from your data pipelines, enabling business users to modify rules while data engineers focus on data movement and transformation.
Medallion Data Architecture
Understanding the Components
Drools (JBoss Rules)
Drools is a Business Rules Management System (BRMS) solution that provides:
| Feature | Description |
|---|---|
| RETE Algorithm | Highly efficient pattern matching algorithm |
| Workbench | Web-based authoring and management tools |
| IDE Integration | Eclipse and IntelliJ plugins for development |
| Decision Tables | Excel-based rule definition for business users |
| Complex Event Processing | Temporal reasoning and event correlation |
Apache NiFi
Apache NiFi is a data automation platform designed for:
- Data Flow Visualization: Drag-and-drop interface for building pipelines
- Provenance Tracking: Complete data lineage and history
- Processor Extensibility: Custom processors via NAR archives
- Clustering: Horizontal scaling for enterprise workloads
Architecture Overview
The integration creates a custom NiFi processor that evaluates Drools rules against incoming FlowFiles:
+----------------+ +---------------------+ +----------------+
| Data Source |---->| RuleEngineProcessor|---->| Downstream |
| (JSON/XML) | | (Drools Rules) | | Processors |
+----------------+ +---------------------+ +----------------+
|
v
+--------------+
| .drl Files |
| (Rules) |
+--------------+
Medallion Data Architecture
Project Structure
The drools-nifi project consists of three primary modules:
drools-nifi/
├── drools-rules-reader/ # Rule interpretation component
│ ├── src/
│ │ └── main/java/
│ │ └── com/gonnect/rules/
│ │ ├── RuleEngine.java
│ │ └── RuleLoader.java
│ └── pom.xml
│
├── nifi-rules-processor/ # Core processor implementation
│ ├── src/
│ │ └── main/java/
│ │ └── com/gonnect/nifi/
│ │ └── RuleEngineProcessor.java
│ └── pom.xml
│
├── nifi-rules-processor-nar/ # NiFi NAR archive packaging
│ └── pom.xml
│
└── pom.xml # Parent POM
Building the Rules Reader
The rules reader module handles loading and compiling Drools rules.
Rule Loader
package com.gonnect.rules;
import org.kie.api.KieServices;
import org.kie.api.builder.KieBuilder;
import org.kie.api.builder.KieFileSystem;
import org.kie.api.builder.KieRepository;
import org.kie.api.runtime.KieContainer;
import org.kie.api.runtime.KieSession;
import org.kie.internal.io.ResourceFactory;
import java.io.File;
import java.util.Collection;
public class RuleLoader {
private final KieServices kieServices;
private KieContainer kieContainer;
public RuleLoader() {
this.kieServices = KieServices.Factory.get();
}
/**
* Load rules from a directory containing .drl files
*/
public void loadRules(String rulesDirectory) {
KieFileSystem kieFileSystem = kieServices.newKieFileSystem();
KieRepository kieRepository = kieServices.getRepository();
File rulesDir = new File(rulesDirectory);
if (!rulesDir.isDirectory()) {
throw new IllegalArgumentException(
"Rules path must be a directory: " + rulesDirectory);
}
// Load all .drl files from the directory
File[] ruleFiles = rulesDir.listFiles(
(dir, name) -> name.endsWith(".drl"));
if (ruleFiles == null || ruleFiles.length == 0) {
throw new IllegalStateException(
"No .drl files found in: " + rulesDirectory);
}
for (File ruleFile : ruleFiles) {
kieFileSystem.write(
ResourceFactory.newFileResource(ruleFile));
}
KieBuilder kieBuilder = kieServices.newKieBuilder(kieFileSystem);
kieBuilder.buildAll();
if (kieBuilder.getResults().hasMessages(
org.kie.api.builder.Message.Level.ERROR)) {
throw new IllegalStateException(
"Rule compilation errors: " +
kieBuilder.getResults().getMessages());
}
this.kieContainer = kieServices.newKieContainer(
kieRepository.getDefaultReleaseId());
}
/**
* Create a new KieSession for rule execution
*/
public KieSession createSession() {
if (kieContainer == null) {
throw new IllegalStateException(
"Rules not loaded. Call loadRules() first.");
}
return kieContainer.newKieSession();
}
}
Rule Engine Executor
package com.gonnect.rules;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.rule.FactHandle;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public class RuleEngine {
private final RuleLoader ruleLoader;
public RuleEngine(String rulesDirectory) {
this.ruleLoader = new RuleLoader();
this.ruleLoader.loadRules(rulesDirectory);
}
/**
* Execute rules against provided facts
*
* @param facts The objects to evaluate against rules
* @return Map containing results and modified facts
*/
public Map<String, Object> executeRules(Collection<Object> facts) {
KieSession session = ruleLoader.createSession();
Map<String, Object> results = new HashMap<>();
try {
// Insert all facts into the session
for (Object fact : facts) {
session.insert(fact);
}
// Fire all matching rules
int rulesFired = session.fireAllRules();
results.put("rulesFired", rulesFired);
// Collect modified facts
Collection<FactHandle> factHandles =
session.getFactHandles();
results.put("factCount", factHandles.size());
// Extract global results if any
Collection<String> globalNames = session.getGlobals().keySet();
for (String globalName : globalNames) {
results.put(globalName, session.getGlobal(globalName));
}
} finally {
session.dispose();
}
return results;
}
}
Creating the NiFi Processor
The core of the integration is the custom NiFi processor that bridges NiFi FlowFiles with Drools rule execution.
RuleEngineProcessor
package com.gonnect.nifi;
import com.gonnect.rules.RuleEngine;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.annotation.behavior.*;
import org.apache.nifi.annotation.documentation.*;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
@Tags({"drools", "rules", "engine", "decision", "brms"})
@CapabilityDescription(
"Executes Drools business rules against incoming JSON FlowFiles. " +
"Rules are loaded from a configured directory and evaluated against " +
"the deserialized JSON content.")
@SeeAlso({})
@ReadsAttributes({
@ReadsAttribute(attribute = "mime.type",
description = "Expected to be application/json")
})
@WritesAttributes({
@WritesAttribute(attribute = "rules.fired",
description = "Number of rules that fired"),
@WritesAttribute(attribute = "rules.evaluation.time",
description = "Time taken to evaluate rules in milliseconds")
})
public class RuleEngineProcessor extends AbstractProcessor {
public static final PropertyDescriptor RULES_DIRECTORY =
new PropertyDescriptor.Builder()
.name("rules-directory")
.displayName("Rules Directory")
.description("Directory containing Drools .drl rule files")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor FACT_CLASS =
new PropertyDescriptor.Builder()
.name("fact-class")
.displayName("Fact Class Name")
.description(
"Fully qualified class name for JSON deserialization")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS =
new Relationship.Builder()
.name("success")
.description("Successfully processed FlowFiles")
.build();
public static final Relationship REL_FAILURE =
new Relationship.Builder()
.name("failure")
.description("FlowFiles that failed rule evaluation")
.build();
public static final Relationship REL_MATCHED =
new Relationship.Builder()
.name("matched")
.description("FlowFiles where rules matched")
.build();
public static final Relationship REL_UNMATCHED =
new Relationship.Builder()
.name("unmatched")
.description("FlowFiles where no rules matched")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
private final AtomicReference<RuleEngine> ruleEngineRef =
new AtomicReference<>();
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
protected void init(final ProcessorInitializationContext context) {
descriptors = Arrays.asList(RULES_DIRECTORY, FACT_CLASS);
relationships = new HashSet<>(Arrays.asList(
REL_SUCCESS, REL_FAILURE, REL_MATCHED, REL_UNMATCHED));
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
public void onTrigger(final ProcessContext context,
final ProcessSession session)
throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
// Initialize rule engine if needed
RuleEngine ruleEngine = ruleEngineRef.get();
if (ruleEngine == null) {
String rulesDir = context.getProperty(RULES_DIRECTORY)
.getValue();
ruleEngine = new RuleEngine(rulesDir);
ruleEngineRef.set(ruleEngine);
}
try {
// Read FlowFile content
String content = readFlowFileContent(session, flowFile);
// Deserialize to fact object
String factClassName = context.getProperty(FACT_CLASS)
.getValue();
Class<?> factClass = Class.forName(factClassName);
Object fact = objectMapper.readValue(content, factClass);
// Execute rules
long startTime = System.currentTimeMillis();
Map<String, Object> results = ruleEngine.executeRules(
Collections.singletonList(fact));
long duration = System.currentTimeMillis() - startTime;
// Update FlowFile attributes
Map<String, String> attributes = new HashMap<>();
attributes.put("rules.fired",
String.valueOf(results.get("rulesFired")));
attributes.put("rules.evaluation.time",
String.valueOf(duration));
flowFile = session.putAllAttributes(flowFile, attributes);
// Serialize modified fact back to FlowFile
String outputContent = objectMapper.writeValueAsString(fact);
flowFile = writeFlowFileContent(
session, flowFile, outputContent);
// Route based on rules fired
int rulesFired = (int) results.get("rulesFired");
if (rulesFired > 0) {
session.transfer(flowFile, REL_MATCHED);
} else {
session.transfer(flowFile, REL_UNMATCHED);
}
} catch (Exception e) {
getLogger().error("Rule evaluation failed", e);
flowFile = session.putAttribute(
flowFile, "rules.error", e.getMessage());
session.transfer(flowFile, REL_FAILURE);
}
}
private String readFlowFileContent(ProcessSession session,
FlowFile flowFile) {
final StringBuilder content = new StringBuilder();
session.read(flowFile, inputStream -> {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(inputStream))) {
String line;
while ((line = reader.readLine()) != null) {
content.append(line);
}
}
});
return content.toString();
}
private FlowFile writeFlowFileContent(ProcessSession session,
FlowFile flowFile,
String content) {
return session.write(flowFile, outputStream ->
outputStream.write(content.getBytes()));
}
}
Defining Business Rules
Rules are defined in .drl files using Drools Rule Language:
Example: Order Processing Rules
package com.gonnect.rules.orders
import com.gonnect.model.Order
import com.gonnect.model.OrderStatus
// Rule: High-value orders require approval
rule "High Value Order Requires Approval"
when
$order : Order(amount > 10000, status == OrderStatus.PENDING)
then
$order.setStatus(OrderStatus.PENDING_APPROVAL);
$order.setApprovalRequired(true);
System.out.println("Order " + $order.getId() +
" requires approval (amount: " + $order.getAmount() + ")");
end
// Rule: Premium customers get expedited processing
rule "Premium Customer Expedited Processing"
when
$order : Order(customerTier == "PREMIUM",
status == OrderStatus.PENDING)
then
$order.setPriority("HIGH");
$order.setExpedited(true);
System.out.println("Order " + $order.getId() +
" expedited for premium customer");
end
// Rule: Apply discount for bulk orders
rule "Bulk Order Discount"
when
$order : Order(quantity >= 100, discountApplied == false)
then
double discount = $order.getAmount() * 0.15;
$order.setDiscount(discount);
$order.setDiscountApplied(true);
System.out.println("Applied 15% bulk discount: " + discount);
end
// Rule: Validate shipping address
rule "Shipping Address Validation"
salience 100 // Higher priority
when
$order : Order(shippingAddress == null ||
shippingAddress.isEmpty())
then
$order.setStatus(OrderStatus.INVALID);
$order.setValidationError("Shipping address is required");
end
Decision Table Format
For business user accessibility, rules can be defined in Excel decision tables:
| RuleSet | Order Processing Rules |
|---|---|
| Import | com.gonnect.model.Order |
| Sequential | true |
| CONDITION | CONDITION | ACTION | ACTION |
|---|---|---|---|
| $order:Order | $order:Order | $order | $order |
| amount | customerTier | setStatus | setPriority |
| Order Amount | Customer Tier | New Status | Priority |
| >= 10000 | PENDING_APPROVAL | ||
| PREMIUM | HIGH | ||
| GOLD | MEDIUM |
Building and Deploying
Build Process
# Clone the repository
git clone https://github.com/mgorav/drools-nifi.git
cd drools-nifi
# Build all modules
mvn clean install package -DskipTests
# The NAR file is created at:
# nifi-rules-processor-nar/target/nifi-rules-processor-nar-1.0.nar
NiFi Deployment
# Copy NAR to NiFi lib directory
cp nifi-rules-processor-nar/target/nifi-rules-processor-nar-1.0.nar \
$NIFI_HOME/lib/
# Restart NiFi to load the new processor
$NIFI_HOME/bin/nifi.sh restart
# Verify processor is available
# Navigate to NiFi UI and search for "RuleEngineProcessor"
NiFi Flow Configuration
Creating the Data Flow
- Add RuleEngineProcessor to the canvas
- Configure Properties:
- Rules Directory:
/opt/nifi/rules/orders/ - Fact Class Name:
com.gonnect.model.Order
- Rules Directory:
- Connect Relationships:
matched-> Further processingunmatched-> Default handlingfailure-> Error handling
Complete Flow Example
+--------------+ +-------------------+ +---------------+
| GetFile |---->| RuleEngineProcessor|---->| PutKafka |
| (JSON Input) | | (Order Rules) | | (Matched) |
+--------------+ +-------------------+ +---------------+
|
| unmatched
v
+---------------+
| LogAttribute |
| (Audit Log) |
+---------------+
Monitoring and Debugging
Rule Execution Metrics
The processor adds attributes to FlowFiles for monitoring:
| Attribute | Description |
|---|---|
rules.fired | Number of rules that triggered |
rules.evaluation.time | Processing time in milliseconds |
rules.error | Error message if evaluation failed |
NiFi Bulletins
# Check processor bulletins for errors
# Available in NiFi UI under "Bulletin Board"
# Common issues:
# - Rule compilation errors
# - Class not found for fact deserialization
# - Invalid JSON input
Best Practices
Rule Design
// 1. Use salience for rule ordering
rule "Validation First"
salience 1000
when ...
then ...
end
// 2. Use rule groups for organization
rule "Discount Rules - Bulk"
agenda-group "discounts"
when ...
then ...
end
// 3. Avoid infinite loops with modify
rule "Update Status"
when
$order : Order(status == "NEW", processed == false)
then
modify($order) {
setStatus("PROCESSING"),
setProcessed(true)
}
end
Performance Optimization
| Technique | Benefit |
|---|---|
| Rule Indexing | Faster pattern matching |
| Stateless Sessions | Reduced memory for simple evaluations |
| Rule Groups | Control rule execution scope |
| Lazy Loading | Initialize rules on first use |
Conclusion
The integration of Drools with Apache NiFi creates a powerful platform for rule-based data flow automation. This combination offers:
- Separation of Concerns: Business rules externalized from data pipeline code
- Visual Rule Management: Business analysts can understand and modify rules
- Data Flow Visualization: NiFi's intuitive interface for pipeline design
- Scalability: NiFi clustering with stateless Drools evaluation
- Auditability: Complete data provenance and rule execution tracking
By implementing the RuleEngineProcessor, organizations can:
- Empower business users to define and modify rules
- Reduce deployment cycles for rule changes
- Maintain clear separation between data flow and business logic
- Leverage NiFi's robust data handling with Drools' powerful inference
The drools-nifi project provides a production-ready foundation for building intelligent, rule-driven data pipelines.