Metrics Collection

Version: 1.0 Date: October 08, 2025 SPDX-License-Identifier: BSD-3-Clause License File: See the LICENSE file in the project root. Copyright: © 2025 Michael Gardner, A Bit of Help, Inc. Authors: Michael Gardner, Claude Code Status: Active

Overview

The pipeline system implements comprehensive metrics collection for monitoring, observability, and performance analysis. Metrics are collected in real-time, exported in Prometheus format, and can be visualized using Grafana dashboards.

Key Features:

  • Prometheus Integration: Native Prometheus metrics export
  • Real-Time Collection: Low-overhead metric updates
  • Comprehensive Coverage: Performance, system, and business metrics
  • Dimensional Data: Labels for multi-dimensional analysis
  • Thread-Safe: Safe concurrent metric updates

Metrics Architecture

Two-Level Metrics System

The system uses a two-level metrics architecture:

┌─────────────────────────────────────────────┐
│         Application Layer                   │
│  - ProcessingMetrics (domain entity)        │
│  - Per-pipeline performance tracking        │
│  - Business metrics and analytics           │
└─────────────────┬───────────────────────────┘
                  │
                  ↓
┌─────────────────────────────────────────────┐
│       Infrastructure Layer                  │
│  - MetricsService (Prometheus)              │
│  - System-wide aggregation                  │
│  - HTTP export endpoint                     │
└─────────────────────────────────────────────┘

Domain Metrics (ProcessingMetrics):

  • Attached to processing context
  • Track individual pipeline execution
  • Support detailed analytics
  • Persist to database

Infrastructure Metrics (MetricsService):

  • System-wide aggregation
  • Prometheus counters, gauges, histograms
  • HTTP /metrics endpoint
  • Real-time monitoring

Domain Metrics

ProcessingMetrics Entity

Tracks performance data for individual pipeline executions:

#![allow(unused)]
fn main() {
use adaptive_pipeline_domain::entities::ProcessingMetrics;
use std::time::{Duration, Instant};
use std::collections::HashMap;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessingMetrics {
    // Progress tracking
    bytes_processed: u64,
    bytes_total: u64,
    chunks_processed: u64,
    chunks_total: u64,

    // Timing (high-resolution internal, RFC3339 for export)
    #[serde(skip)]
    start_time: Option<Instant>,
    #[serde(skip)]
    end_time: Option<Instant>,
    start_time_rfc3339: Option<String>,
    end_time_rfc3339: Option<String>,
    processing_duration: Option<Duration>,

    // Performance metrics
    throughput_bytes_per_second: f64,
    compression_ratio: Option<f64>,

    // Error tracking
    error_count: u64,
    warning_count: u64,

    // File information
    input_file_size_bytes: u64,
    output_file_size_bytes: u64,
    input_file_checksum: Option<String>,
    output_file_checksum: Option<String>,

    // Per-stage metrics
    stage_metrics: HashMap<String, StageMetrics>,
}
}

Creating and Using ProcessingMetrics

#![allow(unused)]
fn main() {
impl ProcessingMetrics {
    /// Create new metrics for pipeline execution
    pub fn new(total_bytes: u64, total_chunks: u64) -> Self {
        Self {
            bytes_processed: 0,
            bytes_total: total_bytes,
            chunks_processed: 0,
            chunks_total: total_chunks,
            start_time: Some(Instant::now()),
            start_time_rfc3339: Some(Utc::now().to_rfc3339()),
            end_time: None,
            end_time_rfc3339: None,
            processing_duration: None,
            throughput_bytes_per_second: 0.0,
            compression_ratio: None,
            error_count: 0,
            warning_count: 0,
            input_file_size_bytes: total_bytes,
            output_file_size_bytes: 0,
            input_file_checksum: None,
            output_file_checksum: None,
            stage_metrics: HashMap::new(),
        }
    }

    /// Update progress
    pub fn update_progress(&mut self, bytes: u64, chunks: u64) {
        self.bytes_processed += bytes;
        self.chunks_processed += chunks;
        self.update_throughput();
    }

    /// Calculate throughput
    fn update_throughput(&mut self) {
        if let Some(start) = self.start_time {
            let elapsed = start.elapsed().as_secs_f64();
            if elapsed > 0.0 {
                self.throughput_bytes_per_second =
                    self.bytes_processed as f64 / elapsed;
            }
        }
    }

    /// Complete processing
    pub fn complete(&mut self) {
        self.end_time = Some(Instant::now());
        self.end_time_rfc3339 = Some(Utc::now().to_rfc3339());

        if let (Some(start), Some(end)) = (self.start_time, self.end_time) {
            self.processing_duration = Some(end - start);
        }

        self.update_throughput();
        self.calculate_compression_ratio();
    }

    /// Calculate compression ratio
    fn calculate_compression_ratio(&mut self) {
        if self.output_file_size_bytes > 0 && self.input_file_size_bytes > 0 {
            self.compression_ratio = Some(
                self.output_file_size_bytes as f64 /
                self.input_file_size_bytes as f64
            );
        }
    }
}
}

StageMetrics

Track performance for individual pipeline stages:

#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StageMetrics {
    /// Stage name
    stage_name: String,

    /// Bytes processed by this stage
    bytes_processed: u64,

    /// Processing time for this stage
    #[serde(skip)]
    processing_time: Option<Duration>,

    /// Throughput (bytes per second)
    throughput_bps: f64,

    /// Error count for this stage
    error_count: u64,

    /// Memory usage (optional)
    memory_usage_bytes: Option<u64>,
}

impl ProcessingMetrics {
    /// Record stage metrics
    pub fn record_stage_metrics(
        &mut self,
        stage_name: String,
        bytes: u64,
        duration: Duration,
    ) {
        let throughput = if duration.as_secs_f64() > 0.0 {
            bytes as f64 / duration.as_secs_f64()
        } else {
            0.0
        };

        let stage_metrics = StageMetrics {
            stage_name: stage_name.clone(),
            bytes_processed: bytes,
            processing_time: Some(duration),
            throughput_bps: throughput,
            error_count: 0,
            memory_usage_bytes: None,
        };

        self.stage_metrics.insert(stage_name, stage_metrics);
    }
}
}

Prometheus Metrics

MetricsService

Infrastructure service for Prometheus metrics:

#![allow(unused)]
fn main() {
use prometheus::{
    IntCounter, IntGauge, Gauge, Histogram,
    HistogramOpts, Opts, Registry
};
use std::sync::Arc;

#[derive(Clone)]
pub struct MetricsService {
    registry: Arc<Registry>,

    // Pipeline execution metrics
    pipelines_processed_total: IntCounter,
    pipeline_processing_duration: Histogram,
    pipeline_bytes_processed_total: IntCounter,
    pipeline_chunks_processed_total: IntCounter,
    pipeline_errors_total: IntCounter,
    pipeline_warnings_total: IntCounter,

    // Performance metrics
    throughput_mbps: Gauge,
    compression_ratio: Gauge,
    active_pipelines: IntGauge,

    // System metrics
    memory_usage_bytes: IntGauge,
    cpu_utilization_percent: Gauge,
}

impl MetricsService {
    pub fn new() -> Result<Self, PipelineError> {
        let registry = Arc::new(Registry::new());

        // Create counters
        let pipelines_processed_total = IntCounter::new(
            "pipeline_processed_total",
            "Total number of pipelines processed"
        )?;

        let pipeline_bytes_processed_total = IntCounter::new(
            "pipeline_bytes_processed_total",
            "Total bytes processed by pipelines"
        )?;

        let pipeline_errors_total = IntCounter::new(
            "pipeline_errors_total",
            "Total number of processing errors"
        )?;

        // Create histograms
        let pipeline_processing_duration = Histogram::with_opts(
            HistogramOpts::new(
                "pipeline_processing_duration_seconds",
                "Pipeline processing duration in seconds"
            )
            .buckets(vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0])
        )?;

        // Create gauges
        let throughput_mbps = Gauge::new(
            "pipeline_throughput_mbps",
            "Current processing throughput in MB/s"
        )?;

        let compression_ratio = Gauge::new(
            "pipeline_compression_ratio",
            "Current compression ratio"
        )?;

        let active_pipelines = IntGauge::new(
            "pipeline_active_count",
            "Number of currently active pipelines"
        )?;

        // Register all metrics
        registry.register(Box::new(pipelines_processed_total.clone()))?;
        registry.register(Box::new(pipeline_bytes_processed_total.clone()))?;
        registry.register(Box::new(pipeline_errors_total.clone()))?;
        registry.register(Box::new(pipeline_processing_duration.clone()))?;
        registry.register(Box::new(throughput_mbps.clone()))?;
        registry.register(Box::new(compression_ratio.clone()))?;
        registry.register(Box::new(active_pipelines.clone()))?;

        Ok(Self {
            registry,
            pipelines_processed_total,
            pipeline_processing_duration,
            pipeline_bytes_processed_total,
            pipeline_chunks_processed_total: /* ... */,
            pipeline_errors_total,
            pipeline_warnings_total: /* ... */,
            throughput_mbps,
            compression_ratio,
            active_pipelines,
            memory_usage_bytes: /* ... */,
            cpu_utilization_percent: /* ... */,
        })
    }
}
}

Recording Metrics

#![allow(unused)]
fn main() {
impl MetricsService {
    /// Record pipeline completion
    pub fn record_pipeline_completion(
        &self,
        metrics: &ProcessingMetrics
    ) -> Result<(), PipelineError> {
        // Increment counters
        self.pipelines_processed_total.inc();
        self.pipeline_bytes_processed_total
            .inc_by(metrics.bytes_processed());
        self.pipeline_chunks_processed_total
            .inc_by(metrics.chunks_processed());

        // Record duration
        if let Some(duration) = metrics.processing_duration() {
            self.pipeline_processing_duration
                .observe(duration.as_secs_f64());
        }

        // Update gauges
        self.throughput_mbps.set(
            metrics.throughput_bytes_per_second() / 1_000_000.0
        );

        if let Some(ratio) = metrics.compression_ratio() {
            self.compression_ratio.set(ratio);
        }

        // Record errors
        if metrics.error_count() > 0 {
            self.pipeline_errors_total
                .inc_by(metrics.error_count());
        }

        Ok(())
    }

    /// Record pipeline start
    pub fn record_pipeline_start(&self) {
        self.active_pipelines.inc();
    }

    /// Record pipeline end
    pub fn record_pipeline_end(&self) {
        self.active_pipelines.dec();
    }
}
}

Available Metrics

Counter Metrics

Monotonically increasing values:

Metric NameTypeDescription
pipeline_processed_totalCounterTotal pipelines processed
pipeline_bytes_processed_totalCounterTotal bytes processed
pipeline_chunks_processed_totalCounterTotal chunks processed
pipeline_errors_totalCounterTotal processing errors
pipeline_warnings_totalCounterTotal warnings

Gauge Metrics

Values that can increase or decrease:

Metric NameTypeDescription
pipeline_active_countGaugeCurrently active pipelines
pipeline_throughput_mbpsGaugeCurrent throughput (MB/s)
pipeline_compression_ratioGaugeCurrent compression ratio
pipeline_memory_usage_bytesGaugeMemory usage in bytes
pipeline_cpu_utilization_percentGaugeCPU utilization percentage

Histogram Metrics

Distribution of values with buckets:

Metric NameTypeBucketsDescription
pipeline_processing_duration_secondsHistogram0.1, 0.5, 1, 5, 10, 30, 60Processing duration
pipeline_chunk_size_bytesHistogram1K, 10K, 100K, 1M, 10MChunk size distribution
pipeline_stage_duration_secondsHistogram0.01, 0.1, 0.5, 1, 5Stage processing time

Metrics Export

HTTP Endpoint

Export metrics via HTTP for Prometheus scraping:

#![allow(unused)]
fn main() {
use warp::Filter;

pub fn metrics_endpoint(
    metrics_service: Arc<MetricsService>
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
    warp::path!("metrics")
        .and(warp::get())
        .map(move || {
            let encoder = prometheus::TextEncoder::new();
            let metric_families = metrics_service.registry.gather();
            let mut buffer = Vec::new();

            encoder.encode(&metric_families, &mut buffer)
                .unwrap_or_else(|e| {
                    eprintln!("Failed to encode metrics: {}", e);
                });

            warp::reply::with_header(
                buffer,
                "Content-Type",
                encoder.format_type()
            )
        })
}
}

Prometheus Configuration

Configure Prometheus to scrape metrics:

# prometheus.yml
scrape_configs:
  - job_name: 'pipeline'
    scrape_interval: 15s
    static_configs:
      - targets: ['localhost:9090']
    metrics_path: '/metrics'

Example Metrics Output

# HELP pipeline_processed_total Total number of pipelines processed
# TYPE pipeline_processed_total counter
pipeline_processed_total 1234

# HELP pipeline_bytes_processed_total Total bytes processed
# TYPE pipeline_bytes_processed_total counter
pipeline_bytes_processed_total 1073741824

# HELP pipeline_processing_duration_seconds Pipeline processing duration
# TYPE pipeline_processing_duration_seconds histogram
pipeline_processing_duration_seconds_bucket{le="0.1"} 45
pipeline_processing_duration_seconds_bucket{le="0.5"} 120
pipeline_processing_duration_seconds_bucket{le="1.0"} 280
pipeline_processing_duration_seconds_bucket{le="5.0"} 450
pipeline_processing_duration_seconds_bucket{le="10.0"} 500
pipeline_processing_duration_seconds_bucket{le="+Inf"} 520
pipeline_processing_duration_seconds_sum 2340.5
pipeline_processing_duration_seconds_count 520

# HELP pipeline_throughput_mbps Current processing throughput
# TYPE pipeline_throughput_mbps gauge
pipeline_throughput_mbps 125.7

# HELP pipeline_compression_ratio Current compression ratio
# TYPE pipeline_compression_ratio gauge
pipeline_compression_ratio 0.35

Integration with Processing

Automatic Metric Collection

Metrics are automatically collected during processing:

#![allow(unused)]
fn main() {
use adaptive_pipeline::MetricsService;

async fn process_file_with_metrics(
    input_path: &str,
    output_path: &str,
    metrics_service: Arc<MetricsService>,
) -> Result<(), PipelineError> {
    // Create domain metrics
    let mut metrics = ProcessingMetrics::new(
        file_size,
        chunk_count
    );

    // Record start
    metrics_service.record_pipeline_start();

    // Process file
    for chunk in chunks {
        let start = Instant::now();

        let processed = process_chunk(chunk)?;

        metrics.update_progress(
            processed.len() as u64,
            1
        );

        metrics.record_stage_metrics(
            "compression".to_string(),
            processed.len() as u64,
            start.elapsed()
        );
    }

    // Complete metrics
    metrics.complete();

    // Export to Prometheus
    metrics_service.record_pipeline_completion(&metrics)?;
    metrics_service.record_pipeline_end();

    Ok(())
}
}

Observer Pattern

Use observer pattern for automatic metric updates:

#![allow(unused)]
fn main() {
pub struct MetricsObserver {
    metrics_service: Arc<MetricsService>,
}

impl MetricsObserver {
    pub fn observe_processing(
        &self,
        event: ProcessingEvent
    ) {
        match event {
            ProcessingEvent::PipelineStarted => {
                self.metrics_service.record_pipeline_start();
            }
            ProcessingEvent::ChunkProcessed { bytes, duration } => {
                self.metrics_service.pipeline_bytes_processed_total
                    .inc_by(bytes);
                self.metrics_service.pipeline_processing_duration
                    .observe(duration.as_secs_f64());
            }
            ProcessingEvent::PipelineCompleted { metrics } => {
                self.metrics_service
                    .record_pipeline_completion(&metrics)
                    .ok();
                self.metrics_service.record_pipeline_end();
            }
            ProcessingEvent::Error => {
                self.metrics_service.pipeline_errors_total.inc();
            }
        }
    }
}
}

Visualization with Grafana

Dashboard Configuration

Create Grafana dashboard for visualization:

{
  "dashboard": {
    "title": "Pipeline Metrics",
    "panels": [
      {
        "title": "Throughput",
        "targets": [{
          "expr": "rate(pipeline_bytes_processed_total[5m]) / 1000000",
          "legendFormat": "Throughput (MB/s)"
        }]
      },
      {
        "title": "Processing Duration (P95)",
        "targets": [{
          "expr": "histogram_quantile(0.95, rate(pipeline_processing_duration_seconds_bucket[5m]))",
          "legendFormat": "P95 Duration"
        }]
      },
      {
        "title": "Active Pipelines",
        "targets": [{
          "expr": "pipeline_active_count",
          "legendFormat": "Active"
        }]
      },
      {
        "title": "Error Rate",
        "targets": [{
          "expr": "rate(pipeline_errors_total[5m])",
          "legendFormat": "Errors/sec"
        }]
      }
    ]
  }
}

Common Queries

Throughput over time:

rate(pipeline_bytes_processed_total[5m]) / 1000000

Average processing duration:

rate(pipeline_processing_duration_seconds_sum[5m]) /
rate(pipeline_processing_duration_seconds_count[5m])

P99 latency:

histogram_quantile(0.99,
  rate(pipeline_processing_duration_seconds_bucket[5m])
)

Error rate:

rate(pipeline_errors_total[5m])

Compression effectiveness:

avg(pipeline_compression_ratio)

Performance Considerations

Low-Overhead Updates

Metrics use atomic operations for minimal overhead:

#![allow(unused)]
fn main() {
// ✅ GOOD: Atomic increment
self.counter.inc();

// ❌ BAD: Locking for simple increment
let mut guard = self.counter.lock().unwrap();
*guard += 1;
}

Batch Updates

Batch metric updates when possible:

#![allow(unused)]
fn main() {
// ✅ GOOD: Batch update
self.pipeline_bytes_processed_total.inc_by(total_bytes);

// ❌ BAD: Multiple individual updates
for _ in 0..total_bytes {
    self.pipeline_bytes_processed_total.inc();
}
}

Efficient Labels

Use labels judiciously to avoid cardinality explosion:

#![allow(unused)]
fn main() {
// ✅ GOOD: Limited cardinality
let counter = register_int_counter_vec!(
    "pipeline_processed_total",
    "Total pipelines processed",
    &["algorithm", "stage"]  // ~10 algorithms × ~5 stages = 50 series
)?;

// ❌ BAD: High cardinality
let counter = register_int_counter_vec!(
    "pipeline_processed_total",
    "Total pipelines processed",
    &["pipeline_id", "user_id"]  // Could be millions of series!
)?;
}

Alerting

Alert Rules

Define Prometheus alert rules:

# alerts.yml
groups:
  - name: pipeline_alerts
    rules:
      - alert: HighErrorRate
        expr: rate(pipeline_errors_total[5m]) > 0.1
        for: 5m
        annotations:
          summary: "High error rate detected"
          description: "Error rate is {{ $value }} errors/sec"

      - alert: LowThroughput
        expr: rate(pipeline_bytes_processed_total[5m]) < 1000000
        for: 10m
        annotations:
          summary: "Low throughput detected"
          description: "Throughput is {{ $value }} bytes/sec"

      - alert: HighLatency
        expr: |
          histogram_quantile(0.95,
            rate(pipeline_processing_duration_seconds_bucket[5m])
          ) > 60
        for: 5m
        annotations:
          summary: "High P95 latency"
          description: "P95 latency is {{ $value }}s"

Best Practices

Metric Naming

Follow Prometheus naming conventions:

#![allow(unused)]
fn main() {
// ✅ GOOD: Clear, consistent names
pipeline_bytes_processed_total      // Counter with _total suffix
pipeline_processing_duration_seconds // Time in base unit (seconds)
pipeline_active_count               // Gauge without suffix

// ❌ BAD: Inconsistent naming
processed_bytes                     // Missing namespace
duration_ms                        // Wrong unit (use seconds)
active                             // Too vague
}

Unit Consistency

Always use base units:

#![allow(unused)]
fn main() {
// ✅ GOOD: Base units
duration_seconds: f64              // Seconds, not milliseconds
size_bytes: u64                    // Bytes, not KB/MB
ratio: f64                         // Unitless ratio 0.0-1.0

// ❌ BAD: Non-standard units
duration_ms: u64
size_mb: f64
percentage: u8
}

Documentation

Document all metrics:

#![allow(unused)]
fn main() {
// ✅ GOOD: Well documented
let counter = IntCounter::with_opts(
    Opts::new(
        "pipeline_processed_total",
        "Total number of pipelines processed successfully. \
         Incremented on completion of each pipeline execution."
    )
)?;
}

Next Steps

Now that you understand metrics collection: