Logging Implementation

Version: 1.0 Date: 2025-10-04 License: BSD-3-Clause Copyright: (c) 2025 Michael Gardner, A Bit of Help, Inc. Authors: Michael Gardner Status: Active


Overview

The Adaptive Pipeline uses structured logging via the tracing crate to provide comprehensive observability throughout the system. This chapter details the logging architecture, implementation patterns, and best practices.

Key Features

  • Structured Logging: Rich, structured log events with contextual metadata
  • Two-Phase Architecture: Separate bootstrap and application logging
  • Hierarchical Levels: Traditional log levels (ERROR, WARN, INFO, DEBUG, TRACE)
  • Targeted Filtering: Fine-grained control via log targets
  • Integration: Seamless integration with ObservabilityService and metrics
  • Performance: Low-overhead logging with compile-time filtering
  • Testability: Trait-based abstractions for testing

Architecture

Two-Phase Logging System

The system employs a two-phase logging approach to handle different initialization stages:

┌─────────────────────────────────────────────────────────────┐
│                    Application Lifecycle                     │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  Phase 1: Bootstrap                Phase 2: Application     │
│  ┌───────────────────┐             ┌──────────────────────┐ │
│  │ BootstrapLogger   │────────────>│ Tracing Subscriber   │ │
│  │ (Early init)      │             │ (Full featured)      │ │
│  ├───────────────────┤             ├──────────────────────┤ │
│  │ - Simple API      │             │ - Structured events  │ │
│  │ - No dependencies │             │ - Span tracking      │ │
│  │ - Testable        │             │ - Context propagation│ │
│  │ - Minimal overhead│             │ - External outputs   │ │
│  └───────────────────┘             └──────────────────────┘ │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Phase 1: Bootstrap Logger

Located in bootstrap/src/logger.rs, the bootstrap logger provides minimal logging during early initialization:

  • Minimal dependencies: No heavy tracing infrastructure
  • Trait-based abstraction: BootstrapLogger trait for testability
  • Simple API: Only 4 log levels (error, warn, info, debug)
  • Early availability: Available before tracing subscriber initialization

Phase 2: Application Logging

Once the application is fully initialized, the full tracing infrastructure is used:

  • Rich structured logging: Fields, spans, and events
  • Multiple subscribers: Console, file, JSON, external systems
  • Performance tracking: Integration with ObservabilityService
  • Distributed tracing: Context propagation for async operations

Log Levels

The system uses five hierarchical log levels, from most to least severe:

LevelMacroUse CaseExample
ERRORerror!()Fatal errors, unrecoverable failuresDatabase connection failure, file corruption
WARNwarn!()Non-fatal issues, degraded performanceHigh error rate alert, configuration warning
INFOinfo!()Normal operations, key milestonesPipeline started, file processed successfully
DEBUGdebug!()Detailed diagnostic informationStage execution details, chunk processing
TRACEtrace!()Very verbose debuggingFunction entry/exit, detailed state dumps

Level Guidelines

ERROR: Use sparingly for genuine failures that require attention

#![allow(unused)]
fn main() {
error!("Failed to connect to database: {}", err);
error!(pipeline_id = %id, "Pipeline execution failed: {}", err);
}

WARN: Use for concerning situations that don't prevent operation

#![allow(unused)]
fn main() {
warn!("High error rate: {:.1}% (threshold: {:.1}%)", rate, threshold);
warn!(stage = %name, "Stage processing slower than expected");
}

INFO: Use for important operational events

#![allow(unused)]
fn main() {
info!("Started pipeline processing: {}", pipeline_name);
info!(bytes = %bytes_processed, duration = ?elapsed, "File processing completed");
}

DEBUG: Use for detailed diagnostic information during development

#![allow(unused)]
fn main() {
debug!("Preparing stage: {}", stage.name());
debug!(chunk_count = chunks, size = bytes, "Processing chunk batch");
}

TRACE: Use for extremely detailed debugging (usually disabled in production)

#![allow(unused)]
fn main() {
trace!("Entering function with args: {:?}", args);
trace!(state = ?current_state, "State transition complete");
}

Bootstrap Logger

BootstrapLogger Trait

The bootstrap logger abstraction allows for different implementations:

#![allow(unused)]
fn main() {
/// Bootstrap logging abstraction
pub trait BootstrapLogger: Send + Sync {
    /// Log an error message (fatal errors during bootstrap)
    fn error(&self, message: &str);

    /// Log a warning message (non-fatal issues)
    fn warn(&self, message: &str);

    /// Log an info message (normal bootstrap progress)
    fn info(&self, message: &str);

    /// Log a debug message (detailed diagnostic information)
    fn debug(&self, message: &str);
}
}

ConsoleLogger Implementation

The production implementation wraps the tracing crate:

#![allow(unused)]
fn main() {
/// Console logger implementation using tracing
pub struct ConsoleLogger {
    prefix: String,
}

impl ConsoleLogger {
    /// Create a new console logger with default prefix
    pub fn new() -> Self {
        Self::with_prefix("bootstrap")
    }

    /// Create a new console logger with custom prefix
    pub fn with_prefix(prefix: impl Into<String>) -> Self {
        Self { prefix: prefix.into() }
    }
}

impl BootstrapLogger for ConsoleLogger {
    fn error(&self, message: &str) {
        tracing::error!(target: "bootstrap", "[{}] {}", self.prefix, message);
    }

    fn warn(&self, message: &str) {
        tracing::warn!(target: "bootstrap", "[{}] {}", self.prefix, message);
    }

    fn info(&self, message: &str) {
        tracing::info!(target: "bootstrap", "[{}] {}", self.prefix, message);
    }

    fn debug(&self, message: &str) {
        tracing::debug!(target: "bootstrap", "[{}] {}", self.prefix, message);
    }
}
}

NoOpLogger for Testing

A no-op implementation for testing scenarios where logging should be silent:

#![allow(unused)]
fn main() {
/// No-op logger for testing
pub struct NoOpLogger;

impl NoOpLogger {
    pub fn new() -> Self {
        Self
    }
}

impl BootstrapLogger for NoOpLogger {
    fn error(&self, _message: &str) {}
    fn warn(&self, _message: &str) {}
    fn info(&self, _message: &str) {}
    fn debug(&self, _message: &str) {}
}
}

Usage Example

#![allow(unused)]
fn main() {
use bootstrap::logger::{BootstrapLogger, ConsoleLogger};

fn bootstrap_application() -> Result<()> {
    let logger = ConsoleLogger::new();

    logger.info("Starting application bootstrap");
    logger.debug("Parsing command line arguments");

    match parse_config() {
        Ok(config) => {
            logger.info("Configuration loaded successfully");
            Ok(())
        }
        Err(e) => {
            logger.error(&format!("Failed to load configuration: {}", e));
            Err(e)
        }
    }
}
}

Application Logging with Tracing

Basic Logging Macros

Once the tracing subscriber is initialized, use the standard tracing macros:

#![allow(unused)]
fn main() {
use tracing::{trace, debug, info, warn, error};

fn process_pipeline(pipeline_id: &str) -> Result<()> {
    info!("Starting pipeline processing: {}", pipeline_id);

    debug!(pipeline_id = %pipeline_id, "Loading pipeline configuration");

    match execute_pipeline(pipeline_id) {
        Ok(result) => {
            info!(
                pipeline_id = %pipeline_id,
                bytes_processed = result.bytes,
                duration = ?result.duration,
                "Pipeline completed successfully"
            );
            Ok(result)
        }
        Err(e) => {
            error!(
                pipeline_id = %pipeline_id,
                error = %e,
                "Pipeline execution failed"
            );
            Err(e)
        }
    }
}
}

Structured Fields

Add structured fields to log events for better searchability and filtering:

#![allow(unused)]
fn main() {
// Simple field
info!(stage = "compression", "Stage started");

// Display formatting with %
info!(pipeline_id = %id, "Processing pipeline");

// Debug formatting with ?
debug!(config = ?pipeline_config, "Loaded configuration");

// Multiple fields
info!(
    pipeline_id = %id,
    stage = "encryption",
    bytes_processed = total_bytes,
    duration_ms = elapsed.as_millis(),
    "Stage completed"
);
}

Log Targets

Use targets to categorize and filter log events:

#![allow(unused)]
fn main() {
// Bootstrap logs
tracing::info!(target: "bootstrap", "Application starting");

// Domain events
tracing::debug!(target: "domain::pipeline", "Creating pipeline entity");

// Infrastructure events
tracing::debug!(target: "infrastructure::database", "Executing query");

// Metrics events
tracing::info!(target: "metrics", "Recording pipeline completion");
}

Example from Codebase

From pipeline/src/infrastructure/repositories/stage_executor.rs:

#![allow(unused)]
fn main() {
tracing::debug!(
    "Processing {} chunks with {} workers",
    chunks.len(),
    worker_count
);

tracing::info!(
    "Processed {} bytes in {:.2}s ({:.2} MB/s)",
    total_bytes,
    duration.as_secs_f64(),
    throughput_mbps
);

tracing::debug!(
    "Stage {} processed {} chunks successfully",
    stage_name,
    chunk_count
);
}

From bootstrap/src/signals.rs:

#![allow(unused)]
fn main() {
tracing::info!("Received SIGTERM, initiating graceful shutdown");
tracing::info!("Received SIGINT (Ctrl+C), initiating graceful shutdown");
tracing::error!("Failed to register SIGTERM handler: {}", e);
}

Integration with ObservabilityService

The logging system integrates with the ObservabilityService for comprehensive monitoring.

Automatic Logging in Operations

From pipeline/src/infrastructure/logging/observability_service.rs:

#![allow(unused)]
fn main() {
impl ObservabilityService {
    /// Complete operation tracking
    pub async fn complete_operation(
        &self,
        operation_name: &str,
        duration: Duration,
        success: bool,
        throughput_mbps: f64,
    ) {
        // ... update metrics ...

        info!(
            "Completed operation: {} in {:.2}s (throughput: {:.2} MB/s, success: {})",
            operation_name,
            duration.as_secs_f64(),
            throughput_mbps,
            success
        );

        // Check for alerts
        self.check_alerts(&tracker).await;
    }

    /// Check for alerts based on current metrics
    async fn check_alerts(&self, tracker: &PerformanceTracker) {
        if tracker.error_rate_percent > self.alert_thresholds.max_error_rate_percent {
            warn!(
                "🚨 Alert: High error rate {:.1}% (threshold: {:.1}%)",
                tracker.error_rate_percent,
                self.alert_thresholds.max_error_rate_percent
            );
        }

        if tracker.average_throughput_mbps < self.alert_thresholds.min_throughput_mbps {
            warn!(
                "🚨 Alert: Low throughput {:.2} MB/s (threshold: {:.2} MB/s)",
                tracker.average_throughput_mbps,
                self.alert_thresholds.min_throughput_mbps
            );
        }
    }
}
}

OperationTracker with Logging

The OperationTracker automatically logs operation lifecycle:

#![allow(unused)]
fn main() {
pub struct OperationTracker {
    operation_name: String,
    start_time: Instant,
    observability_service: ObservabilityService,
    completed: std::sync::atomic::AtomicBool,
}

impl OperationTracker {
    pub async fn complete(self, success: bool, bytes_processed: u64) {
        self.completed.store(true, std::sync::atomic::Ordering::Relaxed);

        let duration = self.start_time.elapsed();
        let throughput_mbps = calculate_throughput(bytes_processed, duration);

        // Logs completion via ObservabilityService.complete_operation()
        self.observability_service
            .complete_operation(&self.operation_name, duration, success, throughput_mbps)
            .await;
    }
}
}

Usage Pattern

#![allow(unused)]
fn main() {
// Start operation (increments active count, logs start)
let tracker = observability_service
    .start_operation("file_processing")
    .await;

// Do work...
let result = process_file(&input_path).await?;

// Complete operation (logs completion with metrics)
tracker.complete(true, result.bytes_processed).await;
}

Logging Configuration

Environment Variables

Control logging behavior via environment variables:

# Set log level (error, warn, info, debug, trace)
export RUST_LOG=info

# Set level per module
export RUST_LOG=pipeline=debug,bootstrap=info

# Set level per target
export RUST_LOG=infrastructure::database=debug

# Complex filtering
export RUST_LOG="info,pipeline::domain=debug,sqlx=warn"

Subscriber Initialization

In main.rs or bootstrap code:

#![allow(unused)]
fn main() {
use tracing_subscriber::{fmt, EnvFilter};

fn init_logging() -> Result<()> {
    // Initialize tracing subscriber with environment filter
    tracing_subscriber::fmt()
        .with_env_filter(
            EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| EnvFilter::new("info"))
        )
        .with_target(true)
        .with_thread_ids(true)
        .with_line_number(true)
        .init();

    info!("Logging initialized");
    Ok(())
}
}

Advanced Subscriber Configuration

#![allow(unused)]
fn main() {
use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Registry};
use tracing_appender::{non_blocking, rolling};

fn init_advanced_logging() -> Result<()> {
    // Console output
    let console_layer = fmt::layer()
        .with_target(true)
        .with_thread_ids(true);

    // File output with daily rotation
    let file_appender = rolling::daily("./logs", "pipeline.log");
    let (non_blocking_appender, _guard) = non_blocking(file_appender);
    let file_layer = fmt::layer()
        .with_writer(non_blocking_appender)
        .with_ansi(false)
        .json();

    // Combine layers
    let subscriber = Registry::default()
        .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
        .with(console_layer)
        .with(file_layer);

    tracing::subscriber::set_global_default(subscriber)?;

    info!("Advanced logging initialized");
    Ok(())
}
}

Performance Considerations

Compile-Time Filtering

Tracing macros are compile-time filtered at the trace! and debug! levels in release builds:

#![allow(unused)]
fn main() {
// This has ZERO overhead in release builds if max_level_debug is not set
debug!("Expensive computation result: {:?}", expensive_calculation());
}

To enable debug/trace in release builds, add to Cargo.toml:

[dependencies]
tracing = { version = "0.1", features = ["max_level_debug"] }

Lazy Evaluation

Use closures for expensive operations:

#![allow(unused)]
fn main() {
// BAD: Always evaluates format_large_struct() even if debug is disabled
debug!("Large struct: {}", format_large_struct(&data));

// GOOD: Only evaluates if debug level is enabled
debug!(data = ?data, "Processing large struct");
}

Structured vs. Formatted

Prefer structured fields over formatted strings:

#![allow(unused)]
fn main() {
// Less efficient: String formatting always happens
info!("Processed {} bytes in {}ms", bytes, duration_ms);

// More efficient: Fields recorded directly
info!(bytes = bytes, duration_ms = duration_ms, "Processed data");
}

Async Performance

In async contexts, avoid blocking operations in log statements:

#![allow(unused)]
fn main() {
// BAD: Blocking call in log statement
info!("Config: {}", read_config_file_sync()); // Blocks async task!

// GOOD: Log after async operation completes
let config = read_config_file().await?;
info!(config = ?config, "Configuration loaded");
}

Benchmark Results

Logging overhead on Intel i7-10700K @ 3.8 GHz:

OperationTimeOverhead
info!() with 3 fields~80 nsNegligible
debug!() (disabled)~0 nsZero
debug!() (enabled) with 5 fields~120 nsMinimal
JSON serialization~500 nsLow
File I/O (async)~10 μsModerate

Recommendation: Log freely at info! and above. Use debug! and trace! judiciously.


Best Practices

✅ DO

Use structured fields for important data

#![allow(unused)]
fn main() {
info!(
    pipeline_id = %id,
    bytes = total_bytes,
    duration_ms = elapsed.as_millis(),
    "Pipeline completed"
);
}

Use display formatting (%) for user-facing types

#![allow(unused)]
fn main() {
info!(pipeline_id = %id, stage = %stage_name, "Processing stage");
}

Use debug formatting (?) for complex types

#![allow(unused)]
fn main() {
debug!(config = ?pipeline_config, "Loaded configuration");
}

Log errors with context

#![allow(unused)]
fn main() {
error!(
    pipeline_id = %id,
    stage = "encryption",
    error = %err,
    "Stage execution failed"
);
}

Use targets for filtering

#![allow(unused)]
fn main() {
tracing::debug!(target: "infrastructure::database", "Executing query: {}", sql);
}

Log at appropriate levels

#![allow(unused)]
fn main() {
// Error: Genuine failures
error!("Database connection lost: {}", err);

// Warn: Concerning but recoverable
warn!("Retry attempt {} of {}", attempt, max_attempts);

// Info: Important operational events
info!("Pipeline started successfully");

// Debug: Detailed diagnostic information
debug!("Stage prepared with {} workers", worker_count);
}

❌ DON'T

Don't log sensitive information

#![allow(unused)]
fn main() {
// BAD: Logging encryption keys
debug!("Encryption key: {}", key); // SECURITY RISK!

// GOOD: Log that operation happened without exposing secrets
debug!("Encryption key loaded from configuration");
}

Don't use println! or eprintln! in production code

#![allow(unused)]
fn main() {
// BAD
println!("Processing file: {}", path);

// GOOD
info!(path = %path, "Processing file");
}

Don't log in hot loops without rate limiting

#![allow(unused)]
fn main() {
// BAD: Logs millions of times
for chunk in chunks {
    debug!("Processing chunk {}", chunk.id); // Too verbose!
}

// GOOD: Log summary
debug!(chunk_count = chunks.len(), "Processing chunks");
// ... process chunks ...
info!(chunks_processed = chunks.len(), "Chunk processing complete");
}

Don't perform expensive operations in log statements

#![allow(unused)]
fn main() {
// BAD
debug!("Data: {}", expensive_serialization(&data));

// GOOD: Use debug formatting for lazy evaluation
debug!(data = ?data, "Processing data");
}

Don't duplicate metrics in logs

#![allow(unused)]
fn main() {
// BAD: Metrics already tracked separately
info!("Throughput: {} MB/s", throughput); // Redundant with MetricsService!

// GOOD: Log events, not metrics
info!("File processing completed successfully");
// MetricsService already tracks throughput
}

Testing Strategies

CapturingLogger for Tests

The bootstrap layer provides a capturing logger for tests:

#![allow(unused)]
fn main() {
#[cfg(test)]
pub struct CapturingLogger {
    messages: Arc<Mutex<Vec<LogMessage>>>,
}

#[cfg(test)]
impl CapturingLogger {
    pub fn new() -> Self {
        Self {
            messages: Arc::new(Mutex::new(Vec::new())),
        }
    }

    pub fn messages(&self) -> Vec<LogMessage> {
        self.messages.lock().unwrap().clone()
    }

    pub fn clear(&self) {
        self.messages.lock().unwrap().clear();
    }
}

#[cfg(test)]
impl BootstrapLogger for CapturingLogger {
    fn error(&self, message: &str) {
        self.log(LogLevel::Error, message);
    }

    fn warn(&self, message: &str) {
        self.log(LogLevel::Warn, message);
    }

    fn info(&self, message: &str) {
        self.log(LogLevel::Info, message);
    }

    fn debug(&self, message: &str) {
        self.log(LogLevel::Debug, message);
    }
}
}

Test Usage

#![allow(unused)]
fn main() {
#[test]
fn test_bootstrap_logging() {
    let logger = CapturingLogger::new();

    logger.info("Starting operation");
    logger.debug("Detailed diagnostic");
    logger.error("Operation failed");

    let messages = logger.messages();
    assert_eq!(messages.len(), 3);
    assert_eq!(messages[0].level, LogLevel::Info);
    assert_eq!(messages[0].message, "Starting operation");
    assert_eq!(messages[2].level, LogLevel::Error);
}
}

Testing with tracing-subscriber

For testing application-level logging:

#![allow(unused)]
fn main() {
use tracing_subscriber::{fmt, EnvFilter};

#[test]
fn test_application_logging() {
    // Initialize test subscriber
    let subscriber = fmt()
        .with_test_writer()
        .with_env_filter(EnvFilter::new("debug"))
        .finish();

    tracing::subscriber::with_default(subscriber, || {
        // Test code that produces logs
        process_pipeline("test-pipeline");
    });
}
}

NoOp Logger for Silent Tests

#![allow(unused)]
fn main() {
#[test]
fn test_without_logging() {
    let logger = NoOpLogger::new();

    // Run tests without any log output
    let result = bootstrap_with_logger(&logger);

    assert!(result.is_ok());
}
}

Common Patterns

Request/Response Logging

#![allow(unused)]
fn main() {
pub async fn process_file(path: &Path) -> Result<ProcessingResult> {
    let request_id = Uuid::new_v4();

    info!(
        request_id = %request_id,
        path = %path.display(),
        "Processing file"
    );

    match do_processing(path).await {
        Ok(result) => {
            info!(
                request_id = %request_id,
                bytes_processed = result.bytes,
                duration_ms = result.duration.as_millis(),
                "File processed successfully"
            );
            Ok(result)
        }
        Err(e) => {
            error!(
                request_id = %request_id,
                error = %e,
                "File processing failed"
            );
            Err(e)
        }
    }
}
}

Progress Logging

#![allow(unused)]
fn main() {
pub async fn process_chunks(chunks: Vec<Chunk>) -> Result<()> {
    let total = chunks.len();

    info!(total_chunks = total, "Starting chunk processing");

    for (i, chunk) in chunks.iter().enumerate() {
        process_chunk(chunk).await?;

        // Log progress every 10%
        if (i + 1) % (total / 10).max(1) == 0 {
            let percent = ((i + 1) * 100) / total;
            info!(
                processed = i + 1,
                total = total,
                percent = percent,
                "Chunk processing progress"
            );
        }
    }

    info!(total_chunks = total, "Chunk processing completed");
    Ok(())
}
}

Error Context Logging

#![allow(unused)]
fn main() {
pub async fn execute_pipeline(id: &PipelineId) -> Result<()> {
    debug!(pipeline_id = %id, "Loading pipeline from database");

    let pipeline = repository.find_by_id(id).await
        .map_err(|e| {
            error!(
                pipeline_id = %id,
                error = %e,
                "Failed to load pipeline from database"
            );
            e
        })?;

    debug!(
        pipeline_id = %id,
        stage_count = pipeline.stages().len(),
        "Pipeline loaded successfully"
    );

    Ok(())
}
}

Conditional Logging

#![allow(unused)]
fn main() {
pub fn process_with_validation(data: &Data) -> Result<()> {
    if let Err(e) = validate(data) {
        warn!(
            validation_error = %e,
            "Data validation failed, attempting recovery"
        );

        return recover_from_validation_error(data, e);
    }

    debug!("Data validation passed");
    process(data)
}
}

Next Steps


References