Vector: High-Performance Data Pipelines for Modern Observability
How Vector's Rust-based architecture delivers 10x better performance than alternatives for log routing, transformation, and multi-destination delivery in AI/ML platforms
Table of Contents
The Data Pipeline Challenge
Modern observability generates massive data volumes: logs from Kubernetes pods, metrics from application instrumentation, traces from distributed systems, and telemetry from AI/ML workloads. Traditional tools like Fluentd and Logstash struggle with this scale, consuming excessive resources and introducing latency.
Vector, built in Rust by Datadog, takes a fundamentally different approach: zero-copy processing, memory safety without garbage collection, and a unified pipeline for all observability data.
Why Vector?
Traditional Tools Pain Points
| Tool | Language | Memory | Throughput | GC Pauses |
|---|---|---|---|---|
| Fluentd | Ruby | ~200MB | ~1TB/day | Ruby GC |
| Logstash | Java | ~500MB+ | ~1TB/day | JVM GC |
| Vector | Rust | ~50MB | 10TB+/day | None |
Vector's Advantages
- 10x performance: Rust's zero-cost abstractions
- Memory efficient: No garbage collector, predictable memory usage
- Type-safe transforms: VRL (Vector Remap Language) catches errors at parse time
- Unified pipeline: Logs, metrics, and traces in one tool
- 50+ sources, 60+ sinks: Connect anything to anything
Vector Architecture
Vector Pipeline Architecture
Core Concepts
1. Sources
Data ingestion from external systems. Vector supports 50+ source types:
- kubernetes_logs: Native K8s log collection
- file: Tail log files with checkpointing
- kafka: Consume from Kafka topics
- opentelemetry: Receive OTLP data
- prometheus_scrape: Pull Prometheus metrics
2. Transforms
Data processing and manipulation using VRL:
- remap: Transform events with VRL
- filter: Drop events by condition
- route: Split streams by condition
- aggregate: Window-based aggregations
- dedupe: Remove duplicate events
3. Sinks
Data output to 60+ destinations:
- loki: Push to Grafana Loki
- elasticsearch: Index to Elasticsearch
- prometheus_exporter: Expose metrics endpoint
- aws_s3: Archive to S3
- opentelemetry: Export via OTLP
Agent + Aggregator Topology
Vector Agent + Aggregator Topology
Agent Role (DaemonSet)
Lightweight collection on every node:
- Collect logs from local pods
- Minimal transformation
- Forward to aggregators
Aggregator Role (StatefulSet)
Centralized processing with persistence:
- Complex transformations
- Multi-destination routing
- Disk buffering for reliability
VRL: Vector Remap Language
VRL is a purpose-built language for observability data transformation. It's:
- Type-safe: Errors caught at parse time
- Performant: Compiles to native code
- Fallible-aware: Explicit error handling
Basic Transformation
[transforms.parse_logs]
type = "remap"
inputs = ["kubernetes_logs"]
source = '''
# Parse JSON from message
. = parse_json!(.message)
# Add timestamp
.processed_at = now()
# Set environment
.environment = "production"
'''
AI/ML Log Processing
[transforms.ml_logs]
type = "remap"
inputs = ["kubernetes_logs"]
source = '''
# Parse structured logs
. = parse_json!(.message) ?? .
# Extract model metrics
if exists(.model) && exists(.latency_ms) {
.metrics = {
"model": .model,
"latency_ms": to_int!(.latency_ms),
"tokens_used": to_int(.total_tokens) ?? 0,
"estimated_cost": to_float(.total_tokens) ?? 0.0 * 0.00002
}
}
# Classify severity
.severity = if contains(string!(.level), "error") {
"error"
} else if contains(string!(.level), "warn") {
"warning"
} else {
"info"
}
'''
Routing by Condition
[transforms.route_logs]
type = "route"
inputs = ["parse_logs"]
[transforms.route_logs.route]
errors = '.severity == "error"'
ml_events = '.pipeline == "ml" && exists(.metrics)'
default = true
Log to Metric Conversion
[transforms.inference_metrics]
type = "log_to_metric"
inputs = ["ml_logs"]
[[transforms.inference_metrics.metrics]]
type = "counter"
field = "metrics.tokens_used"
name = "inference_tokens_total"
tags = { model = "{{metrics.model}}" }
[[transforms.inference_metrics.metrics]]
type = "histogram"
field = "metrics.latency_ms"
name = "inference_latency_ms"
tags = { model = "{{metrics.model}}" }
Complete Configuration Example
# vector.toml - AI/ML Observability Pipeline
# ============ SOURCES ============
[sources.kubernetes_logs]
type = "kubernetes_logs"
auto_partial_merge = true
[sources.otel_logs]
type = "opentelemetry"
address = "0.0.0.0:4317"
[sources.host_metrics]
type = "host_metrics"
collectors = ["cpu", "memory", "disk", "network"]
# ============ TRANSFORMS ============
[transforms.parse_app_logs]
type = "remap"
inputs = ["kubernetes_logs"]
source = '''
# Parse structured logs
structured, err = parse_json(.message)
if err == null {
. = merge(., structured)
del(.message)
}
# Add Kubernetes context
.k8s = {
"namespace": .kubernetes.pod_namespace,
"pod": .kubernetes.pod_name,
"container": .kubernetes.container_name
}
'''
[transforms.filter_noise]
type = "filter"
inputs = ["parse_app_logs"]
condition = '''
!contains(string(.message) ?? "", "healthcheck") &&
!contains(string(.message) ?? "", "readiness")
'''
[transforms.route_by_destination]
type = "route"
inputs = ["filter_noise"]
[transforms.route_by_destination.route]
errors = '.level == "error" || .level == "ERROR"'
all = true
# ============ SINKS ============
[sinks.loki_all]
type = "loki"
inputs = ["route_by_destination.all"]
endpoint = "http://loki:3100"
encoding.codec = "json"
[sinks.loki_all.labels]
namespace = "{{ k8s.namespace }}"
pod = "{{ k8s.pod }}"
level = "{{ level }}"
[sinks.loki_errors]
type = "loki"
inputs = ["route_by_destination.errors"]
endpoint = "http://loki:3100"
encoding.codec = "json"
[sinks.loki_errors.labels]
stream = "errors"
[sinks.prometheus]
type = "prometheus_exporter"
inputs = ["host_metrics"]
address = "0.0.0.0:9598"
[sinks.s3_archive]
type = "aws_s3"
inputs = ["route_by_destination.all"]
bucket = "logs-archive"
key_prefix = "logs/%Y/%m/%d/"
compression = "gzip"
encoding.codec = "ndjson"
Kubernetes Deployment
Agent (DaemonSet)
# values-agent.yaml
role: "Agent"
customConfig:
sources:
kubernetes_logs:
type: kubernetes_logs
transforms:
parse:
type: remap
inputs: ["kubernetes_logs"]
source: |
. = parse_json!(.message) ?? .
sinks:
aggregator:
type: vector
inputs: ["parse"]
address: "vector-aggregator:6000"
Aggregator (StatefulSet)
# values-aggregator.yaml
role: "Aggregator"
persistence:
enabled: true
size: 50Gi
customConfig:
sources:
agents:
type: vector
address: "0.0.0.0:6000"
sinks:
loki:
type: loki
inputs: ["agents"]
endpoint: "http://loki:3100"
Installation
# Add Helm repo
helm repo add vector https://helm.vector.dev
helm repo update
# Install Agent
helm install vector-agent vector/vector \
--namespace observability \
-f values-agent.yaml
# Install Aggregator
helm install vector-aggregator vector/vector \
--namespace observability \
-f values-aggregator.yaml
OpenTelemetry Integration
Vector provides native OTLP support for seamless integration:
# Receive OTLP data
[sources.otel]
type = "opentelemetry"
address = "0.0.0.0:4317"
grpc.tls.enabled = false
# Process OTLP logs
[transforms.otel_process]
type = "remap"
inputs = ["otel"]
source = '''
.processed_at = now()
.processor = "vector"
'''
# Export to OTLP endpoint
[sinks.otel_export]
type = "opentelemetry"
inputs = ["otel_process"]
endpoint = "http://otel-collector:4317"
[sinks.otel_export.encoding]
codec = "native"
Best Practices
1. Agent + Aggregator Pattern
Use agents for collection, aggregators for processing. This provides:
- Reduced network traffic
- Centralized transformation logic
- Disk buffering at aggregators
2. Enable Disk Buffers
Prevent data loss during downstream outages:
[sinks.loki]
type = "loki"
buffer.type = "disk"
buffer.max_size = 5368709120 # 5GB
3. Test VRL Scripts
Use Vector's built-in testing:
# Test VRL transformation
vector vrl --input '{"message": "{\"level\": \"error\"}"}' \
--program '. = parse_json!(.message)'
4. Monitor Vector Itself
Export Vector's internal metrics:
[sources.internal_metrics]
type = "internal_metrics"
[sinks.prometheus_internal]
type = "prometheus_exporter"
inputs = ["internal_metrics"]
address = "0.0.0.0:9599"
5. Resource Limits
Set appropriate K8s limits:
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 1000m
memory: 512Mi
Business Impact
| Metric | Improvement |
|---|---|
| Resource Usage | 70% reduction vs Fluentd/Logstash |
| Throughput | 10x higher events/sec |
| Reliability | Zero data loss with disk buffering |
| Flexibility | Route to multiple backends simultaneously |
| Cost | 30-50% lower infrastructure costs |
Key Takeaways
- Rust performance delivers 10x throughput vs JVM/Ruby alternatives
- VRL provides type-safe, performant data transformation
- Agent + Aggregator topology scales to any workload
- Native OTLP support integrates seamlessly with OpenTelemetry
- Disk buffering ensures zero data loss during outages