Pipeline Stages

Version: 1.0 Date: October 08, 2025 SPDX-License-Identifier: BSD-3-Clause License File: See the LICENSE file in the project root. Copyright: © 2025 Michael Gardner, A Bit of Help, Inc. Authors: Michael Gardner, Claude Code Status: Active

What is a Stage?

A pipeline stage is a single processing operation that transforms data in a specific way. Each stage performs one well-defined task, like compressing data, encrypting it, or verifying its integrity.

Think of stages like workstations on an assembly line. Each workstation has specialized tools and performs one specific operation. The product moves from one workstation to the next until it's complete.

Stage Types

Our pipeline supports four main categories of stages:

1. Compression Stages

Compression stages reduce the size of your data. This is useful for:

  • Saving disk space
  • Reducing network bandwidth
  • Faster file transfers
  • Lower storage costs

Available Compression Algorithms:

  • Brotli - Best compression ratio, slower speed

    • Best for: Text files, web content, logs
    • Performance: Excellent compression, moderate speed
    • Memory: Higher memory usage
  • Gzip - General-purpose compression

    • Best for: General files, wide compatibility
    • Performance: Good balance of speed and ratio
    • Memory: Moderate memory usage
  • Zstandard (zstd) - Modern, fast compression

    • Best for: Large files, real-time compression
    • Performance: Excellent speed and ratio
    • Memory: Efficient memory usage
  • LZ4 - Extremely fast compression

    • Best for: Real-time applications, live data streams
    • Performance: Fastest compression, moderate ratio
    • Memory: Low memory usage

2. Encryption Stages

Encryption stages protect your data by making it unreadable without the correct key. This is essential for:

  • Protecting sensitive information
  • Compliance with security regulations
  • Secure data transmission
  • Privacy protection

Available Encryption Algorithms:

  • AES-256-GCM - Industry standard encryption

    • Key Size: 256 bits (32 bytes)
    • Security: FIPS approved, very strong
    • Performance: Excellent with AES-NI hardware support
    • Authentication: Built-in integrity verification
  • ChaCha20-Poly1305 - Modern stream cipher

    • Key Size: 256 bits (32 bytes)
    • Security: Strong, constant-time implementation
    • Performance: Consistent across all platforms
    • Authentication: Built-in integrity verification
  • AES-128-GCM - Faster AES variant

    • Key Size: 128 bits (16 bytes)
    • Security: Still very secure, slightly faster
    • Performance: Faster than AES-256
    • Authentication: Built-in integrity verification

3. Integrity Verification Stages

Integrity stages ensure your data hasn't been corrupted or tampered with. They create a unique "fingerprint" of your data called a checksum or hash.

Available Hashing Algorithms:

  • SHA-256 - Industry standard hashing

    • Output: 256 bits (32 bytes)
    • Security: Cryptographically secure
    • Performance: Good balance
    • Use Case: General integrity verification
  • SHA-512 - Stronger SHA variant

    • Output: 512 bits (64 bytes)
    • Security: Stronger than SHA-256
    • Performance: Good on 64-bit systems
    • Use Case: High-security applications
  • BLAKE3 - Modern, high-performance hashing

    • Output: 256 bits (32 bytes)
    • Security: Strong security properties
    • Performance: Very fast
    • Use Case: High-performance applications

4. Transform Stages

Transform stages modify or inspect data for specific purposes like debugging, monitoring, or data flow analysis. Unlike other stages, transforms focus on observability and diagnostics rather than data protection or size reduction.

Available Transform Stages:

  • Tee - Data splitting/inspection stage

    • Purpose: Copy data to secondary output while passing through
    • Use Cases: Debugging, monitoring, audit trails, data forking
    • Performance: Limited by I/O speed to tee output
    • Configuration:
      • output_path (required): Where to write teed data
      • format: binary (default), hex, or text
      • enabled: true (default) or false to disable
    • Example: Capture intermediate pipeline data for analysis
  • Debug - Diagnostic monitoring stage

    • Purpose: Monitor data flow with zero data modification
    • Use Cases: Pipeline debugging, performance analysis, corruption detection
    • Performance: Minimal overhead, pass-through operation
    • Configuration:
      • label (required): Unique identifier for metrics
    • Metrics: Emits Prometheus metrics (checksums, bytes, chunk counts)
    • Example: Detect where data corruption occurs in complex pipelines

Stage Configuration

Each stage has a configuration that specifies how it should process data:

#![allow(unused)]
fn main() {
use adaptive_pipeline_domain::entities::{PipelineStage, StageType, StageConfiguration};
use std::collections::HashMap;

// Example: Compression stage
let compression_stage = PipelineStage::new(
    "compress".to_string(),
    StageType::Compression,
    StageConfiguration::new(
        "zstd".to_string(),  // algorithm name
        HashMap::new(),      // parameters
        false,               // parallel processing
    ),
    0, // stage order
)?;

// Example: Encryption stage
let encryption_stage = PipelineStage::new(
    "encrypt".to_string(),
    StageType::Encryption,
    StageConfiguration::new(
        "aes256gcm".to_string(),
        HashMap::new(),
        false,
    ),
    1, // stage order
)?;

// Example: Integrity verification stage
let integrity_stage = PipelineStage::new(
    "verify".to_string(),
    StageType::Checksum,
    StageConfiguration::new(
        "sha256".to_string(),
        HashMap::new(),
        false,
    ),
    2, // stage order
)?;
}

Stage Execution Order

Stages execute in the order you define them. The output of one stage becomes the input to the next stage.

Recommended Order for Processing:

  1. Compress (reduce size first)
  2. Encrypt (protect compressed data)
  3. Verify integrity (create checksum of encrypted data)

For Restoration (reverse order):

  1. Verify integrity (check encrypted data)
  2. Decrypt (recover compressed data)
  3. Decompress (restore original file)
Processing Pipeline:
Input File → Compress → Encrypt → Verify → Output File

Restoration Pipeline:
Input File → Verify → Decrypt → Decompress → Output File

Combining Stages

You can combine stages in different ways depending on your needs:

Maximum Security

#![allow(unused)]
fn main() {
vec![
    PipelineStage::new(
        "compress".to_string(),
        StageType::Compression,
        StageConfiguration::new("brotli".to_string(), HashMap::new(), false),
        0,
    )?,
    PipelineStage::new(
        "encrypt".to_string(),
        StageType::Encryption,
        StageConfiguration::new("aes256gcm".to_string(), HashMap::new(), false),
        1,
    )?,
    PipelineStage::new(
        "verify".to_string(),
        StageType::Checksum,
        StageConfiguration::new("blake3".to_string(), HashMap::new(), false),
        2,
    )?,
]
}

Maximum Speed

#![allow(unused)]
fn main() {
vec![
    PipelineStage::new(
        "compress".to_string(),
        StageType::Compression,
        StageConfiguration::new("lz4".to_string(), HashMap::new(), false),
        0,
    )?,
    PipelineStage::new(
        "encrypt".to_string(),
        StageType::Encryption,
        StageConfiguration::new("chacha20poly1305".to_string(), HashMap::new(), false),
        1,
    )?,
]
}

Balanced Approach

#![allow(unused)]
fn main() {
vec![
    PipelineStage::new(
        "compress".to_string(),
        StageType::Compression,
        StageConfiguration::new("zstd".to_string(), HashMap::new(), false),
        0,
    )?,
    PipelineStage::new(
        "encrypt".to_string(),
        StageType::Encryption,
        StageConfiguration::new("aes256gcm".to_string(), HashMap::new(), false),
        1,
    )?,
    PipelineStage::new(
        "verify".to_string(),
        StageType::Checksum,
        StageConfiguration::new("sha256".to_string(), HashMap::new(), false),
        2,
    )?,
]
}

Debugging Pipeline

#![allow(unused)]
fn main() {
vec![
    PipelineStage::new(
        "debug-input".to_string(),
        StageType::Transform,
        StageConfiguration::new("debug".to_string(), {
            let mut params = HashMap::new();
            params.insert("label".to_string(), "input-data".to_string());
            params
        }, false),
        0,
    )?,
    PipelineStage::new(
        "compress".to_string(),
        StageType::Compression,
        StageConfiguration::new("zstd".to_string(), HashMap::new(), false),
        1,
    )?,
    PipelineStage::new(
        "tee-compressed".to_string(),
        StageType::Transform,
        StageConfiguration::new("tee".to_string(), {
            let mut params = HashMap::new();
            params.insert("output_path".to_string(), "/tmp/compressed.bin".to_string());
            params.insert("format".to_string(), "hex".to_string());
            params
        }, false),
        2,
    )?,
    PipelineStage::new(
        "encrypt".to_string(),
        StageType::Encryption,
        StageConfiguration::new("aes256gcm".to_string(), HashMap::new(), false),
        3,
    )?,
]
}

Parallel Processing

Stages process file chunks in parallel for better performance:

File Split into Chunks:
┌──────┬──────┬──────┬──────┐
│Chunk1│Chunk2│Chunk3│Chunk4│
└──┬───┴──┬───┴──┬───┴──┬───┘
   │      │      │      │
   ▼      ▼      ▼      ▼
   ┌──────┬──────┬──────┬──────┐
   │Stage1│Stage1│Stage1│Stage1│ (Parallel)
   └──┬───┴──┬───┴──┬───┴──┬───┘
      ▼      ▼      ▼      ▼
   ┌──────┬──────┬──────┬──────┐
   │Stage2│Stage2│Stage2│Stage2│ (Parallel)
   └──┬───┴──┬───┴──┬───┴──┬───┘
      │      │      │      │
      ▼      ▼      ▼      ▼
   Combined Output File

This parallel processing allows the pipeline to utilize multiple CPU cores for faster throughput.

Stage Validation

The pipeline validates stages at creation time:

  • Algorithm compatibility: Ensures compression algorithms are only used in compression stages
  • Stage order: Verifies stages have unique, sequential order numbers
  • Configuration validity: Checks all stage parameters are valid
  • Dependency checks: Ensures restoration pipelines match processing pipelines
#![allow(unused)]
fn main() {
// This will fail - wrong algorithm for stage type
PipelineStage::new(
    "compress".to_string(),
    StageType::Compression,
    StageConfiguration::new(
        "aes256gcm".to_string(), // ❌ Encryption algorithm in compression stage!
        HashMap::new(),
        false,
    ),
    0,
) // ❌ Error: Algorithm not compatible with stage type
}

Extending with Custom Stages

The pipeline can be easily extended through custom stages to meet your specific requirements. You can create custom stages that implement your own processing logic, integrate third-party tools, or add specialized transformations.

For detailed information on implementing custom stages, see Custom Stages in the Advanced Topics section.

Next Steps

Now that you understand pipeline stages, you can learn about: