Performance Optimization

Version: 0.1.0 Date: October 08, 2025 SPDX-License-Identifier: BSD-3-Clause License File: See the LICENSE file in the project root. Copyright: © 2025 Michael Gardner, A Bit of Help, Inc. Authors: Michael Gardner Status: Draft

This chapter explores performance optimization strategies for the adaptive pipeline, including benchmarking methodologies, tuning parameters, and common performance bottlenecks with their solutions.

Overview

The pipeline is designed for high-performance file processing with several optimization strategies:

  1. Adaptive Configuration: Automatically selects optimal settings based on file characteristics
  2. Parallel Processing: Leverages multi-core systems with Tokio and Rayon
  3. Resource Management: Prevents oversubscription with CPU/I/O token governance
  4. Memory Efficiency: Streaming processing with bounded memory usage
  5. I/O Optimization: Memory mapping, chunked I/O, and device-specific tuning

Performance Goals:

  • Throughput: 100-500 MB/s for compression/encryption pipelines
  • Latency: < 100 ms overhead for small files (< 10 MB)
  • Memory: Bounded memory usage regardless of file size
  • Scalability: Linear scaling up to available CPU cores

Performance Metrics

Throughput

Definition: Bytes processed per second

#![allow(unused)]
fn main() {
use adaptive_pipeline_domain::entities::ProcessingMetrics;

let metrics = ProcessingMetrics::new();
metrics.start();

// ... process data ...
metrics.add_bytes_processed(file_size);

metrics.end();

println!("Throughput: {:.2} MB/s", metrics.throughput_mb_per_second());
}

Typical Values:

  • Uncompressed I/O: 500-2000 MB/s (limited by storage device)
  • LZ4 compression: 300-600 MB/s (fast, low compression)
  • Brotli compression: 50-150 MB/s (slow, high compression)
  • AES-256-GCM encryption: 400-800 MB/s (hardware-accelerated)
  • ChaCha20-Poly1305: 200-400 MB/s (software)

Latency

Definition: Time from start to completion

Components:

  • Setup overhead: File opening, thread pool initialization (1-5 ms)
  • I/O time: Reading/writing chunks (varies by device and size)
  • Processing time: Compression, encryption, hashing (varies by algorithm)
  • Coordination overhead: Task spawning, semaphore acquisition (< 1 ms)

Optimization Strategies:

  • Minimize setup overhead by reusing resources
  • Use memory mapping for large files to reduce I/O time
  • Choose faster algorithms (LZ4 vs Brotli, ChaCha20 vs AES)
  • Batch small operations to amortize coordination overhead

Memory Usage

Formula:

Peak Memory ≈ chunk_size × active_workers × files_concurrent

Example:

chunk_size = 64 MB
active_workers = 7
files_concurrent = 1
Peak Memory ≈ 64 MB × 7 × 1 = 448 MB

Monitoring:

#![allow(unused)]
fn main() {
use adaptive_pipeline::infrastructure::metrics::CONCURRENCY_METRICS;

let mem_mb = CONCURRENCY_METRICS.memory_used_mb();
let mem_pct = CONCURRENCY_METRICS.memory_utilization_percent();

println!("Memory: {:.2} MB ({:.1}%)", mem_mb, mem_pct);
}

Optimization Strategies

1. Chunk Size Optimization

Impact: Chunk size affects memory usage, I/O efficiency, and parallelism.

Adaptive Chunk Sizing:

#![allow(unused)]
fn main() {
use adaptive_pipeline_domain::value_objects::ChunkSize;

// Automatically selects optimal chunk size based on file size
let chunk_size = ChunkSize::optimal_for_file_size(file_size);

println!("Optimal chunk size: {}", chunk_size);  // e.g., "4.0MB"
}

Guidelines:

File SizeChunk SizeRationale
< 10 MB (small)64-256 KBMinimize memory, enable fine-grained parallelism
10-100 MB (medium)256 KB-1 MBBalance memory and I/O efficiency
100 MB-1 GB (large)1-4 MBReduce I/O overhead, acceptable memory usage
> 1 GB (huge)4-16 MBMaximize I/O throughput, still bounded memory

Trade-offs:

  • Small chunks: ✅ Lower memory, better parallelism ❌ Higher I/O overhead
  • Large chunks: ✅ Lower I/O overhead ❌ Higher memory, less parallelism

2. Worker Count Optimization

Impact: Worker count affects CPU utilization and resource contention.

Adaptive Worker Count:

#![allow(unused)]
fn main() {
use adaptive_pipeline_domain::value_objects::WorkerCount;

// File size + system resources + processing type
let workers = WorkerCount::optimal_for_processing_type(
    file_size,
    available_cores,
    is_cpu_intensive,  // true for compression/encryption
);

println!("Optimal workers: {}", workers);  // e.g., "8 workers"
}

Empirically Validated Strategies:

File SizeWorker CountStrategyBenchmark Result
5 MB (small)9Aggressive parallelism+102% speedup
50 MB (medium)5Balanced approach+70% speedup
2 GB (huge)3Conservative (avoid overhead)+76% speedup

Why these strategies work:

  • Small files: Task overhead is amortized quickly with many workers
  • Medium files: Balanced to avoid both under-utilization and over-subscription
  • Huge files: Fewer workers prevent memory pressure and coordination overhead

3. Memory Mapping vs Regular I/O

When to use memory mapping:

  • ✅ Files > 100 MB (amortizes setup cost)
  • ✅ Random access patterns (page cache efficiency)
  • ✅ Read-heavy workloads (no write overhead)

When to use regular I/O:

  • ✅ Files < 10 MB (lower setup cost)
  • ✅ Sequential access patterns (streaming)
  • ✅ Write-heavy workloads (buffered writes)

Configuration:

#![allow(unused)]
fn main() {
use adaptive_pipeline::infrastructure::adapters::file_io::TokioFileIO;
use adaptive_pipeline_domain::services::file_io_service::FileIOConfig;

let config = FileIOConfig {
    enable_memory_mapping: true,
    max_mmap_size: 1024 * 1024 * 1024,  // 1 GB threshold
    default_chunk_size: 64 * 1024,       // 64 KB chunks
    ..Default::default()
};

let service = TokioFileIO::new(config);
}

Benchmark Results (from pipeline/benches/file_io_benchmark.rs):

File SizeRegular I/OMemory MappingWinner
1 MB2000 MB/s1500 MB/sRegular I/O
10 MB1800 MB/s1900 MB/sComparable
50 MB1500 MB/s2200 MB/sMemory Mapping
100 MB1400 MB/s2500 MB/sMemory Mapping

4. Compression Algorithm Selection

Performance vs Compression Ratio:

AlgorithmCompression SpeedDecompression SpeedRatioUse Case
LZ4500-700 MB/s2000-3000 MB/s2-3xReal-time, low latency
Zstd200-400 MB/s600-800 MB/s3-5xBalanced, general use
Brotli50-150 MB/s300-500 MB/s4-8xStorage, high compression

Adaptive Selection:

#![allow(unused)]
fn main() {
use adaptive_pipeline_domain::services::CompressionPriority;

// Automatic algorithm selection
let config = service.get_optimal_config(
    "data.bin",
    &sample_data,
    CompressionPriority::Speed,  // or CompressionPriority::Ratio
)?;

println!("Selected: {:?}", config.algorithm);
}

Guidelines:

  • Speed priority: LZ4 for streaming, real-time processing
  • Balanced: Zstandard for general-purpose use
  • Ratio priority: Brotli for archival, storage optimization

5. Encryption Algorithm Selection

Performance Characteristics:

AlgorithmThroughputSecurityHardware Support
AES-256-GCM400-800 MB/sExcellentYes (AES-NI)
ChaCha20-Poly1305200-400 MB/sExcellentNo
XChaCha20-Poly1305180-350 MB/sExcellentNo

Configuration:

#![allow(unused)]
fn main() {
use adaptive_pipeline_domain::services::EncryptionAlgorithm;

// Use AES-256-GCM if hardware support available
let algorithm = if has_aes_ni() {
    EncryptionAlgorithm::Aes256Gcm  // 2-4x faster with AES-NI
} else {
    EncryptionAlgorithm::ChaCha20Poly1305  // Software fallback
};
}

Common Bottlenecks

1. CPU Bottleneck

Symptoms:

  • CPU saturation > 80%
  • High CPU wait times (P95 > 50 ms)
  • Low I/O utilization

Causes:

  • Too many CPU-intensive operations (compression, encryption)
  • Insufficient worker count for CPU-bound work
  • Slow algorithms (Brotli on large files)

Solutions:

#![allow(unused)]
fn main() {
// Increase CPU tokens to match cores
let config = ResourceConfig {
    cpu_tokens: Some(available_cores),  // Use all cores
    ..Default::default()
};

// Use faster algorithms
let compression = CompressionAlgorithm::Lz4;  // Instead of Brotli
let encryption = EncryptionAlgorithm::Aes256Gcm;  // With AES-NI

// Optimize worker count
let workers = WorkerCount::optimal_for_processing_type(
    file_size,
    available_cores,
    true,  // CPU-intensive = true
);
}

2. I/O Bottleneck

Symptoms:

  • I/O saturation > 80%
  • High I/O wait times (P95 > 100 ms)
  • Low CPU utilization

Causes:

  • Too many concurrent I/O operations
  • Small chunk sizes causing excessive syscalls
  • Storage device queue depth exceeded

Solutions:

#![allow(unused)]
fn main() {
// Increase chunk size to reduce I/O overhead
let chunk_size = ChunkSize::from_mb(4)?;  // 4 MB chunks

// Reduce I/O concurrency for HDD
let config = ResourceConfig {
    storage_type: StorageType::HDD,  // 4 I/O tokens
    ..Default::default()
};

// Use memory mapping for large files
let use_mmap = file_size > 100 * 1024 * 1024;  // > 100 MB
}

I/O Optimization by Device:

Device TypeOptimal Chunk SizeI/O TokensStrategy
HDD1-4 MB4Sequential, large chunks
SSD256 KB-1 MB12Balanced
NVMe64 KB-256 KB24Parallel, small chunks

3. Memory Bottleneck

Symptoms:

  • Memory utilization > 80%
  • Swapping (check vmstat)
  • OOM errors

Causes:

  • Too many concurrent chunks allocated
  • Large chunk size × high worker count
  • Memory leaks or unbounded buffers

Solutions:

#![allow(unused)]
fn main() {
// Reduce chunk size
let chunk_size = ChunkSize::from_mb(16)?;  // Smaller chunks

// Limit concurrent workers
let config = ResourceConfig {
    cpu_tokens: Some(3),  // Fewer workers = less memory
    ..Default::default()
};

// Monitor memory closely
if CONCURRENCY_METRICS.memory_utilization_percent() > 80.0 {
    warn!("High memory usage, reducing chunk size");
    chunk_size = ChunkSize::from_mb(8)?;
}
}

4. Coordination Overhead

Symptoms:

  • High task spawn latency
  • Context switching > 10k/sec
  • Low overall throughput despite low resource usage

Causes:

  • Too many small tasks (excessive spawn_blocking calls)
  • High semaphore contention
  • Channel backpressure

Solutions:

#![allow(unused)]
fn main() {
// Batch small operations
if chunks.len() < 10 {
    // Sequential for small batches (avoid spawn overhead)
    for chunk in chunks {
        process_chunk_sync(chunk)?;
    }
} else {
    // Parallel for large batches
    tokio::task::spawn_blocking(move || {
        RAYON_POOLS.cpu_bound_pool().install(|| {
            chunks.into_par_iter().map(process_chunk_sync).collect()
        })
    }).await??
}

// Reduce worker count to lower contention
let workers = WorkerCount::new(available_cores / 2);
}

Tuning Parameters

Chunk Size Tuning

Parameters:

#![allow(unused)]
fn main() {
pub struct ChunkSize {
    pub const MIN_SIZE: usize = 1;              // 1 byte
    pub const MAX_SIZE: usize = 512 * 1024 * 1024;  // 512 MB
    pub const DEFAULT_SIZE: usize = 1024 * 1024;    // 1 MB
}
}

Configuration:

#![allow(unused)]
fn main() {
// Via ChunkSize value object
let chunk_size = ChunkSize::from_mb(4)?;

// Via CLI/config file
let chunk_size_mb = 4;
let chunk_size = ChunkSize::from_mb(chunk_size_mb)?;

// Adaptive (recommended)
let chunk_size = ChunkSize::optimal_for_file_size(file_size);
}

Impact:

  • Memory: Directly proportional (2x chunk = 2x memory per worker)
  • I/O overhead: Inversely proportional (2x chunk = 0.5x syscalls)
  • Parallelism: Inversely proportional (2x chunk = 0.5x parallel units)

Worker Count Tuning

Parameters:

#![allow(unused)]
fn main() {
pub struct WorkerCount {
    pub const MIN_WORKERS: usize = 1;
    pub const MAX_WORKERS: usize = 32;
    pub const DEFAULT_WORKERS: usize = 4;
}
}

Configuration:

#![allow(unused)]
fn main() {
// Manual
let workers = WorkerCount::new(8);

// Adaptive (recommended)
let workers = WorkerCount::optimal_for_file_size(file_size);

// With system resources
let workers = WorkerCount::optimal_for_file_and_system(
    file_size,
    available_cores,
);

// With processing type
let workers = WorkerCount::optimal_for_processing_type(
    file_size,
    available_cores,
    is_cpu_intensive,
);
}

Impact:

  • Throughput: Generally increases with workers (up to cores)
  • Memory: Directly proportional (2x workers = 2x memory)
  • Context switching: Increases with workers (diminishing returns > 2x cores)

Resource Token Tuning

CPU Tokens:

#![allow(unused)]
fn main() {
let config = ResourceConfig {
    cpu_tokens: Some(7),  // cores - 1 (default)
    ..Default::default()
};
}

I/O Tokens:

#![allow(unused)]
fn main() {
let config = ResourceConfig {
    io_tokens: Some(24),          // Device-specific
    storage_type: StorageType::NVMe,
    ..Default::default()
};
}

Impact:

  • CPU tokens: Limits total CPU-bound parallelism across all files
  • I/O tokens: Limits total I/O concurrency across all files
  • Both: Prevent system oversubscription

Performance Monitoring

Real-Time Metrics

#![allow(unused)]
fn main() {
use adaptive_pipeline::infrastructure::metrics::CONCURRENCY_METRICS;
use std::time::Duration;

// Spawn monitoring task
tokio::spawn(async {
    let mut interval = tokio::time::interval(Duration::from_secs(5));
    loop {
        interval.tick().await;

        // Resource saturation
        let cpu_sat = CONCURRENCY_METRICS.cpu_saturation_percent();
        let io_sat = CONCURRENCY_METRICS.io_saturation_percent();
        let mem_util = CONCURRENCY_METRICS.memory_utilization_percent();

        // Wait time percentiles
        let cpu_p95 = CONCURRENCY_METRICS.cpu_wait_p95();
        let io_p95 = CONCURRENCY_METRICS.io_wait_p95();

        info!(
            "Resources: CPU={:.1}%, I/O={:.1}%, Mem={:.1}% | Wait: CPU={}ms, I/O={}ms",
            cpu_sat, io_sat, mem_util, cpu_p95, io_p95
        );

        // Alert on issues
        if cpu_sat > 90.0 {
            warn!("CPU saturated - consider increasing workers or faster algorithms");
        }
        if mem_util > 80.0 {
            warn!("High memory - consider reducing chunk size or workers");
        }
    }
});
}

Processing Metrics

#![allow(unused)]
fn main() {
use adaptive_pipeline_domain::entities::ProcessingMetrics;

let metrics = ProcessingMetrics::new();
metrics.start();

// Process file...
for chunk in chunks {
    metrics.add_bytes_processed(chunk.data.len() as u64);
}

metrics.end();

// Report performance
println!("Throughput: {:.2} MB/s", metrics.throughput_mb_per_second());
println!("Duration: {:.2}s", metrics.duration().as_secs_f64());
println!("Processed: {} MB", metrics.bytes_processed() / (1024 * 1024));

// Stage-specific metrics
for stage_metrics in metrics.stage_metrics() {
    println!("  {}: {:.2} MB/s", stage_metrics.stage_name, stage_metrics.throughput);
}
}

Performance Best Practices

1. Use Adaptive Configuration

#![allow(unused)]
fn main() {
// ✅ Good: Let the system optimize
let chunk_size = ChunkSize::optimal_for_file_size(file_size);
let workers = WorkerCount::optimal_for_processing_type(
    file_size,
    available_cores,
    is_cpu_intensive,
);

// ❌ Bad: Fixed values
let chunk_size = ChunkSize::from_mb(1)?;
let workers = WorkerCount::new(8);
}

2. Choose Appropriate Algorithms

#![allow(unused)]
fn main() {
// ✅ Good: Algorithm selection based on priority
let compression_config = service.get_optimal_config(
    file_extension,
    &sample_data,
    CompressionPriority::Speed,  // or Ratio
)?;

// ❌ Bad: Always use same algorithm
let compression_config = CompressionConfig {
    algorithm: CompressionAlgorithm::Brotli,  // Slow!
    ..Default::default()
};
}

3. Monitor and Measure

#![allow(unused)]
fn main() {
// ✅ Good: Measure actual performance
let start = Instant::now();
let result = process_file(path).await?;
let duration = start.elapsed();

let throughput_mb_s = (file_size as f64 / duration.as_secs_f64()) / (1024.0 * 1024.0);
info!("Throughput: {:.2} MB/s", throughput_mb_s);

// ❌ Bad: Assume performance without measurement
let result = process_file(path).await?;
}

4. Batch Small Operations

#![allow(unused)]
fn main() {
// ✅ Good: Batch to amortize overhead
tokio::task::spawn_blocking(move || {
    RAYON_POOLS.cpu_bound_pool().install(|| {
        chunks.into_par_iter()
            .map(|chunk| process_chunk(chunk))
            .collect::<Result<Vec<_>, _>>()
    })
}).await??

// ❌ Bad: Spawn for each small operation
for chunk in chunks {
    tokio::task::spawn_blocking(move || {
        process_chunk(chunk)  // Excessive spawn overhead!
    }).await??
}
}

5. Use Device-Specific Settings

#![allow(unused)]
fn main() {
// ✅ Good: Configure for storage type
let config = ResourceConfig {
    storage_type: StorageType::NVMe,  // 24 I/O tokens
    io_tokens: Some(24),
    ..Default::default()
};

// ❌ Bad: One size fits all
let config = ResourceConfig {
    io_tokens: Some(12),  // May be suboptimal
    ..Default::default()
};
}

Troubleshooting Performance Issues

Issue 1: Low Throughput Despite Low Resource Usage

Symptoms:

  • Throughput < 100 MB/s
  • CPU usage < 50%
  • I/O usage < 50%

Diagnosis:

#![allow(unused)]
fn main() {
// Check coordination overhead
let queue_depth = CONCURRENCY_METRICS.cpu_queue_depth();
let active_workers = CONCURRENCY_METRICS.active_workers();

println!("Queue: {}, Active: {}", queue_depth, active_workers);
}

Causes:

  • Too few workers (underutilization)
  • Small batch sizes (high spawn overhead)
  • Synchronous bottlenecks

Solutions:

#![allow(unused)]
fn main() {
// Increase workers
let workers = WorkerCount::new(available_cores);

// Batch operations
let batch_size = 100;
for batch in chunks.chunks(batch_size) {
    process_batch(batch).await?;
}
}

Issue 2: Inconsistent Performance

Symptoms:

  • Performance varies widely between runs
  • High P99 latencies (> 10x P50)

Diagnosis:

#![allow(unused)]
fn main() {
// Check wait time distribution
let p50 = CONCURRENCY_METRICS.cpu_wait_p50();
let p95 = CONCURRENCY_METRICS.cpu_wait_p95();
let p99 = CONCURRENCY_METRICS.cpu_wait_p99();

println!("Wait times: P50={}ms, P95={}ms, P99={}ms", p50, p95, p99);
}

Causes:

  • Resource contention (high wait times)
  • GC pauses or memory pressure
  • External system interference

Solutions:

#![allow(unused)]
fn main() {
// Reduce contention
let config = ResourceConfig {
    cpu_tokens: Some(available_cores - 2),  // Leave headroom
    ..Default::default()
};

// Monitor memory
if mem_util > 70.0 {
    chunk_size = ChunkSize::from_mb(chunk_size_mb / 2)?;
}
}

Issue 3: Memory Growth

Symptoms:

  • Memory usage grows over time
  • Eventually triggers OOM or swapping

Diagnosis:

#![allow(unused)]
fn main() {
// Track memory trends
let mem_start = CONCURRENCY_METRICS.memory_used_mb();
// ... process files ...
let mem_end = CONCURRENCY_METRICS.memory_used_mb();

if mem_end > mem_start * 1.5 {
    warn!("Memory grew {:.1}%", ((mem_end - mem_start) / mem_start) * 100.0);
}
}

Causes:

  • Memory leaks (improper cleanup)
  • Unbounded queues or buffers
  • Large chunk size with many workers

Solutions:

#![allow(unused)]
fn main() {
// Use RAII guards for cleanup
struct ChunkBuffer {
    data: Vec<u8>,
    _guard: MemoryGuard,
}

// Limit queue depth
let (tx, rx) = tokio::sync::mpsc::channel(100);  // Bounded channel

// Reduce chunk size
let chunk_size = ChunkSize::from_mb(16)?;  // Smaller
}

Summary

The pipeline's performance optimization system provides:

  1. Adaptive Configuration: Automatic chunk size and worker count optimization
  2. Algorithm Selection: Choose algorithms based on speed/ratio priority
  3. Resource Governance: Prevent oversubscription with token limits
  4. Memory Efficiency: Bounded memory usage with streaming processing
  5. Comprehensive Monitoring: Real-time metrics and performance tracking

Key Takeaways:

  • Use adaptive configuration (ChunkSize::optimal_for_file_size, WorkerCount::optimal_for_processing_type)
  • Choose algorithms based on workload (LZ4 for speed, Brotli for ratio)
  • Monitor metrics regularly (CPU/I/O saturation, wait times, throughput)
  • Tune based on bottleneck (CPU: increase workers/faster algorithms, I/O: increase chunk size, Memory: reduce chunk/workers)
  • Benchmark and measure actual performance (don't assume)

Performance Goals Achieved:

  • ✅ Throughput: 100-500 MB/s (algorithm-dependent)
  • ✅ Latency: < 100 ms overhead for small files
  • ✅ Memory: Bounded usage (chunk_size × workers × files)
  • ✅ Scalability: Linear scaling up to available cores