Vector: High-Performance Data Pipelines for Modern Observability

How Vector's Rust-based architecture delivers 10x better performance than alternatives for log routing, transformation, and multi-destination delivery in AI/ML platforms

GT
Gonnect Team
January 15, 202512 min read
VectorRustObservabilityData PipelinesOpenTelemetry

The Data Pipeline Challenge

Modern observability generates massive data volumes: logs from Kubernetes pods, metrics from application instrumentation, traces from distributed systems, and telemetry from AI/ML workloads. Traditional tools like Fluentd and Logstash struggle with this scale, consuming excessive resources and introducing latency.

Vector, built in Rust by Datadog, takes a fundamentally different approach: zero-copy processing, memory safety without garbage collection, and a unified pipeline for all observability data.

Why Vector?

Traditional Tools Pain Points

ToolLanguageMemoryThroughputGC Pauses
FluentdRuby~200MB~1TB/dayRuby GC
LogstashJava~500MB+~1TB/dayJVM GC
VectorRust~50MB10TB+/dayNone

Vector's Advantages

  • 10x performance: Rust's zero-cost abstractions
  • Memory efficient: No garbage collector, predictable memory usage
  • Type-safe transforms: VRL (Vector Remap Language) catches errors at parse time
  • Unified pipeline: Logs, metrics, and traces in one tool
  • 50+ sources, 60+ sinks: Connect anything to anything

Vector Architecture

Vector Pipeline Architecture

Loading diagram...

Core Concepts

1. Sources

Data ingestion from external systems. Vector supports 50+ source types:

  • kubernetes_logs: Native K8s log collection
  • file: Tail log files with checkpointing
  • kafka: Consume from Kafka topics
  • opentelemetry: Receive OTLP data
  • prometheus_scrape: Pull Prometheus metrics

2. Transforms

Data processing and manipulation using VRL:

  • remap: Transform events with VRL
  • filter: Drop events by condition
  • route: Split streams by condition
  • aggregate: Window-based aggregations
  • dedupe: Remove duplicate events

3. Sinks

Data output to 60+ destinations:

  • loki: Push to Grafana Loki
  • elasticsearch: Index to Elasticsearch
  • prometheus_exporter: Expose metrics endpoint
  • aws_s3: Archive to S3
  • opentelemetry: Export via OTLP

Agent + Aggregator Topology

Vector Agent + Aggregator Topology

Loading diagram...

Agent Role (DaemonSet)

Lightweight collection on every node:

  • Collect logs from local pods
  • Minimal transformation
  • Forward to aggregators

Aggregator Role (StatefulSet)

Centralized processing with persistence:

  • Complex transformations
  • Multi-destination routing
  • Disk buffering for reliability

VRL: Vector Remap Language

VRL is a purpose-built language for observability data transformation. It's:

  • Type-safe: Errors caught at parse time
  • Performant: Compiles to native code
  • Fallible-aware: Explicit error handling

Basic Transformation

[transforms.parse_logs]
type = "remap"
inputs = ["kubernetes_logs"]
source = '''
  # Parse JSON from message
  . = parse_json!(.message)

  # Add timestamp
  .processed_at = now()

  # Set environment
  .environment = "production"
'''

AI/ML Log Processing

[transforms.ml_logs]
type = "remap"
inputs = ["kubernetes_logs"]
source = '''
  # Parse structured logs
  . = parse_json!(.message) ?? .

  # Extract model metrics
  if exists(.model) && exists(.latency_ms) {
    .metrics = {
      "model": .model,
      "latency_ms": to_int!(.latency_ms),
      "tokens_used": to_int(.total_tokens) ?? 0,
      "estimated_cost": to_float(.total_tokens) ?? 0.0 * 0.00002
    }
  }

  # Classify severity
  .severity = if contains(string!(.level), "error") {
    "error"
  } else if contains(string!(.level), "warn") {
    "warning"
  } else {
    "info"
  }
'''

Routing by Condition

[transforms.route_logs]
type = "route"
inputs = ["parse_logs"]

  [transforms.route_logs.route]
  errors = '.severity == "error"'
  ml_events = '.pipeline == "ml" && exists(.metrics)'
  default = true

Log to Metric Conversion

[transforms.inference_metrics]
type = "log_to_metric"
inputs = ["ml_logs"]

  [[transforms.inference_metrics.metrics]]
  type = "counter"
  field = "metrics.tokens_used"
  name = "inference_tokens_total"
  tags = { model = "{{metrics.model}}" }

  [[transforms.inference_metrics.metrics]]
  type = "histogram"
  field = "metrics.latency_ms"
  name = "inference_latency_ms"
  tags = { model = "{{metrics.model}}" }

Complete Configuration Example

# vector.toml - AI/ML Observability Pipeline

# ============ SOURCES ============

[sources.kubernetes_logs]
type = "kubernetes_logs"
auto_partial_merge = true

[sources.otel_logs]
type = "opentelemetry"
address = "0.0.0.0:4317"

[sources.host_metrics]
type = "host_metrics"
collectors = ["cpu", "memory", "disk", "network"]

# ============ TRANSFORMS ============

[transforms.parse_app_logs]
type = "remap"
inputs = ["kubernetes_logs"]
source = '''
  # Parse structured logs
  structured, err = parse_json(.message)
  if err == null {
    . = merge(., structured)
    del(.message)
  }

  # Add Kubernetes context
  .k8s = {
    "namespace": .kubernetes.pod_namespace,
    "pod": .kubernetes.pod_name,
    "container": .kubernetes.container_name
  }
'''

[transforms.filter_noise]
type = "filter"
inputs = ["parse_app_logs"]
condition = '''
  !contains(string(.message) ?? "", "healthcheck") &&
  !contains(string(.message) ?? "", "readiness")
'''

[transforms.route_by_destination]
type = "route"
inputs = ["filter_noise"]

  [transforms.route_by_destination.route]
  errors = '.level == "error" || .level == "ERROR"'
  all = true

# ============ SINKS ============

[sinks.loki_all]
type = "loki"
inputs = ["route_by_destination.all"]
endpoint = "http://loki:3100"
encoding.codec = "json"

  [sinks.loki_all.labels]
  namespace = "{{ k8s.namespace }}"
  pod = "{{ k8s.pod }}"
  level = "{{ level }}"

[sinks.loki_errors]
type = "loki"
inputs = ["route_by_destination.errors"]
endpoint = "http://loki:3100"
encoding.codec = "json"

  [sinks.loki_errors.labels]
  stream = "errors"

[sinks.prometheus]
type = "prometheus_exporter"
inputs = ["host_metrics"]
address = "0.0.0.0:9598"

[sinks.s3_archive]
type = "aws_s3"
inputs = ["route_by_destination.all"]
bucket = "logs-archive"
key_prefix = "logs/%Y/%m/%d/"
compression = "gzip"
encoding.codec = "ndjson"

Kubernetes Deployment

Agent (DaemonSet)

# values-agent.yaml
role: "Agent"

customConfig:
  sources:
    kubernetes_logs:
      type: kubernetes_logs

  transforms:
    parse:
      type: remap
      inputs: ["kubernetes_logs"]
      source: |
        . = parse_json!(.message) ?? .

  sinks:
    aggregator:
      type: vector
      inputs: ["parse"]
      address: "vector-aggregator:6000"

Aggregator (StatefulSet)

# values-aggregator.yaml
role: "Aggregator"

persistence:
  enabled: true
  size: 50Gi

customConfig:
  sources:
    agents:
      type: vector
      address: "0.0.0.0:6000"

  sinks:
    loki:
      type: loki
      inputs: ["agents"]
      endpoint: "http://loki:3100"

Installation

# Add Helm repo
helm repo add vector https://helm.vector.dev
helm repo update

# Install Agent
helm install vector-agent vector/vector \
  --namespace observability \
  -f values-agent.yaml

# Install Aggregator
helm install vector-aggregator vector/vector \
  --namespace observability \
  -f values-aggregator.yaml

OpenTelemetry Integration

Vector provides native OTLP support for seamless integration:

# Receive OTLP data
[sources.otel]
type = "opentelemetry"
address = "0.0.0.0:4317"
grpc.tls.enabled = false

# Process OTLP logs
[transforms.otel_process]
type = "remap"
inputs = ["otel"]
source = '''
  .processed_at = now()
  .processor = "vector"
'''

# Export to OTLP endpoint
[sinks.otel_export]
type = "opentelemetry"
inputs = ["otel_process"]
endpoint = "http://otel-collector:4317"

  [sinks.otel_export.encoding]
  codec = "native"

Best Practices

1. Agent + Aggregator Pattern

Use agents for collection, aggregators for processing. This provides:

  • Reduced network traffic
  • Centralized transformation logic
  • Disk buffering at aggregators

2. Enable Disk Buffers

Prevent data loss during downstream outages:

[sinks.loki]
type = "loki"
buffer.type = "disk"
buffer.max_size = 5368709120  # 5GB

3. Test VRL Scripts

Use Vector's built-in testing:

# Test VRL transformation
vector vrl --input '{"message": "{\"level\": \"error\"}"}' \
  --program '. = parse_json!(.message)'

4. Monitor Vector Itself

Export Vector's internal metrics:

[sources.internal_metrics]
type = "internal_metrics"

[sinks.prometheus_internal]
type = "prometheus_exporter"
inputs = ["internal_metrics"]
address = "0.0.0.0:9599"

5. Resource Limits

Set appropriate K8s limits:

resources:
  requests:
    cpu: 100m
    memory: 128Mi
  limits:
    cpu: 1000m
    memory: 512Mi

Business Impact

MetricImprovement
Resource Usage70% reduction vs Fluentd/Logstash
Throughput10x higher events/sec
ReliabilityZero data loss with disk buffering
FlexibilityRoute to multiple backends simultaneously
Cost30-50% lower infrastructure costs

Key Takeaways

  1. Rust performance delivers 10x throughput vs JVM/Ruby alternatives
  2. VRL provides type-safe, performant data transformation
  3. Agent + Aggregator topology scales to any workload
  4. Native OTLP support integrates seamlessly with OpenTelemetry
  5. Disk buffering ensures zero data loss during outages

Further Reading