Stage Processing

Version: 0.1.0 Date: October 08, 2025 SPDX-License-Identifier: BSD-3-Clause License File: See the LICENSE file in the project root. Copyright: © 2025 Michael Gardner, A Bit of Help, Inc. Authors: Michael Gardner Status: Draft

This chapter provides a comprehensive overview of the stage processing architecture in the adaptive pipeline system. Stages are the fundamental building blocks that transform data as it flows through a pipeline.


Table of Contents


Overview

Stages are individual processing steps within a pipeline that transform file chunks as data flows from input to output. Each stage performs a specific operation such as compression, encryption, or integrity checking.

Key Characteristics

  • Type Safety: Strongly-typed stage operations prevent configuration errors
  • Ordering: Explicit ordering ensures predictable execution sequence
  • Lifecycle Management: Stages track creation and modification timestamps
  • State Management: Stages can be enabled/disabled without removal
  • Resource Awareness: Stages provide resource estimation and management

Stage Processing Architecture

┌─────────────────────────────────────────────────────────────┐
│                        Pipeline                             │
│                                                             │
│  ┌────────────┐  ┌────────────┐  ┌────────────┐           │
│  │  Stage 1   │  │  Stage 2   │  │  Stage 3   │           │
│  │ Checksum   │→ │ Compress   │→ │  Encrypt   │→ Output  │
│  │ (Order 0)  │  │ (Order 1)  │  │ (Order 2)  │           │
│  └────────────┘  └────────────┘  └────────────┘           │
│        ↑               ↑               ↑                    │
│        └───────────────┴───────────────┘                    │
│              Stage Executor                                 │
└─────────────────────────────────────────────────────────────┘

Design Principles

  1. Domain-Driven Design: Stages are domain entities with identity
  2. Separation of Concerns: Configuration separated from execution
  3. Async-First: All operations are asynchronous for scalability
  4. Extensibility: New stage types can be added through configuration

Stage Types

The pipeline supports five distinct stage types, each optimized for different data transformation operations.

StageType Enum

#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum StageType {
    /// Compression or decompression operations
    Compression,

    /// Encryption or decryption operations
    Encryption,

    /// Data transformation operations
    Transform,

    /// Checksum calculation and verification
    Checksum,

    /// Pass-through stage that doesn't modify data
    PassThrough,
}
}

Stage Type Details

Stage TypePurposeExamplesTypical Use Case
CompressionReduce data sizeBrotli, Gzip, Zstd, Lz4Minimize storage/bandwidth
EncryptionSecure dataAES-256-GCM, ChaCha20Data protection
TransformInspect/modify dataTee, DebugDebugging, monitoring, data forking
ChecksumVerify integritySHA-256, SHA-512, Blake3Data validation
PassThroughNo modificationIdentity transformTesting/debugging

Parsing Stage Types

Stage types support case-insensitive parsing from strings:

#![allow(unused)]
fn main() {
use adaptive_pipeline_domain::entities::pipeline_stage::StageType;
use std::str::FromStr;

// Parse from lowercase
let compression = StageType::from_str("compression").unwrap();
assert_eq!(compression, StageType::Compression);

// Case-insensitive parsing
let encryption = StageType::from_str("ENCRYPTION").unwrap();
assert_eq!(encryption, StageType::Encryption);

// Display format
assert_eq!(format!("{}", StageType::Checksum), "checksum");
}

Pattern Matching

#![allow(unused)]
fn main() {
fn describe_stage(stage_type: StageType) -> &'static str {
    match stage_type {
        StageType::Compression => "Reduces data size",
        StageType::Encryption => "Secures data",
        StageType::Transform => "Modifies data structure",
        StageType::Checksum => "Verifies data integrity",
        StageType::PassThrough => "No modification",
    }
}
}

Stage Entity

The PipelineStage is a domain entity that encapsulates a specific data transformation operation within a pipeline.

Entity Structure

#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PipelineStage {
    id: StageId,
    name: String,
    stage_type: StageType,
    configuration: StageConfiguration,
    enabled: bool,
    order: u32,
    created_at: chrono::DateTime<chrono::Utc>,
    updated_at: chrono::DateTime<chrono::Utc>,
}
}

Entity Characteristics

  • Identity: Unique StageId persists through configuration changes
  • Name: Human-readable identifier (must not be empty)
  • Type: Strongly-typed operation (Compression, Encryption, etc.)
  • Configuration: Algorithm-specific parameters
  • Enabled Flag: Controls execution without removal
  • Order: Determines execution sequence (0-based)
  • Timestamps: Track creation and modification times

Creating a Stage

#![allow(unused)]
fn main() {
use adaptive_pipeline_domain::entities::pipeline_stage::{PipelineStage, StageConfiguration, StageType};
use std::collections::HashMap;

let mut params = HashMap::new();
params.insert("level".to_string(), "6".to_string());

let config = StageConfiguration::new("brotli".to_string(), params, true);
let stage = PipelineStage::new(
    "compression".to_string(),
    StageType::Compression,
    config,
    0  // Order: execute first
).unwrap();

assert_eq!(stage.name(), "compression");
assert_eq!(stage.stage_type(), &StageType::Compression);
assert_eq!(stage.algorithm(), "brotli");
assert!(stage.is_enabled());
}

Modifying Stage State

#![allow(unused)]
fn main() {
let mut stage = PipelineStage::new(
    "checksum".to_string(),
    StageType::Checksum,
    StageConfiguration::default(),
    0,
).unwrap();

// Disable the stage temporarily
stage.set_enabled(false);
assert!(!stage.is_enabled());

// Update configuration
let mut new_params = HashMap::new();
new_params.insert("algorithm".to_string(), "sha512".to_string());
let new_config = StageConfiguration::new("sha512".to_string(), new_params, true);
stage.update_configuration(new_config);

// Change execution order
stage.update_order(2);
assert_eq!(stage.order(), 2);

// Re-enable the stage
stage.set_enabled(true);
}

Stage Configuration

Each stage has a configuration that specifies how data should be transformed.

Configuration Structure

#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StageConfiguration {
    pub algorithm: String,
    pub parameters: HashMap<String, String>,
    pub parallel_processing: bool,
    pub chunk_size: Option<usize>,
}
}

Configuration Parameters

FieldTypeDescriptionDefault
algorithmStringAlgorithm name (e.g., "brotli", "aes256gcm")"default"
parametersHashMapAlgorithm-specific key-value parameters{}
parallel_processingboolEnable parallel chunk processingtrue
chunk_sizeOption<usize>Custom chunk size (1KB - 100MB)None

Compression Configuration

#![allow(unused)]
fn main() {
let mut params = HashMap::new();
params.insert("level".to_string(), "9".to_string());

let config = StageConfiguration::new(
    "zstd".to_string(),
    params,
    true,  // Enable parallel processing
);
}

Encryption Configuration

#![allow(unused)]
fn main() {
let mut params = HashMap::new();
params.insert("key_size".to_string(), "256".to_string());

let config = StageConfiguration::new(
    "aes256gcm".to_string(),
    params,
    false,  // Sequential processing for encryption
);
}

Default Configuration

#![allow(unused)]
fn main() {
let config = StageConfiguration::default();
// algorithm: "default"
// parameters: {}
// parallel_processing: true
// chunk_size: None
}

Stage Lifecycle

Stages progress through several lifecycle phases from creation to execution.

Lifecycle Phases

1. Creation
   ↓
2. Configuration
   ↓
3. Ordering
   ↓
4. Execution
   ↓
5. Monitoring

1. Creation Phase

Stages are created with initial configuration:

#![allow(unused)]
fn main() {
let stage = PipelineStage::new(
    "compression".to_string(),
    StageType::Compression,
    StageConfiguration::default(),
    0,
)?;
}

2. Configuration Phase

Parameters can be updated as needed:

#![allow(unused)]
fn main() {
stage.update_configuration(new_config);
// updated_at timestamp is automatically updated
}

3. Ordering Phase

Position in pipeline can be adjusted:

#![allow(unused)]
fn main() {
stage.update_order(1);
// Stage now executes second instead of first
}

4. Execution Phase

Stage processes data according to its configuration:

#![allow(unused)]
fn main() {
let executor: Arc<dyn StageExecutor> = /* ... */;
let result = executor.execute(&stage, chunk, &mut context).await?;
}

5. Monitoring Phase

Timestamps track when changes occur:

#![allow(unused)]
fn main() {
println!("Created: {}", stage.created_at());
println!("Last modified: {}", stage.updated_at());
}

Stage Execution Model

The stage executor processes file chunks through configured stages using two primary execution modes.

Single Chunk Processing

Process individual chunks sequentially:

#![allow(unused)]
fn main() {
async fn execute(
    &self,
    stage: &PipelineStage,
    chunk: FileChunk,
    context: &mut ProcessingContext,
) -> Result<FileChunk, PipelineError>
}

Execution Flow:

Input Chunk → Validate → Process → Update Context → Output Chunk

Parallel Processing

Process multiple chunks concurrently:

#![allow(unused)]
fn main() {
async fn execute_parallel(
    &self,
    stage: &PipelineStage,
    chunks: Vec<FileChunk>,
    context: &mut ProcessingContext,
) -> Result<Vec<FileChunk>, PipelineError>
}

Execution Flow:

Chunks: [1, 2, 3, 4]
         ↓  ↓  ↓  ↓
      ┌────┬───┬───┬────┐
      │ T1 │T2 │T3 │ T4 │  (Parallel threads)
      └────┴───┴───┴────┘
         ↓  ↓  ↓  ↓
Results: [1, 2, 3, 4]

Processing Context

The ProcessingContext maintains state during execution:

#![allow(unused)]
fn main() {
pub struct ProcessingContext {
    pub pipeline_id: String,
    pub stage_metrics: HashMap<String, StageMetrics>,
    pub checksums: HashMap<String, Vec<u8>>,
    // ... other context fields
}
}

Stage Executor Interface

The StageExecutor trait defines the contract for stage execution engines.

Trait Definition

#![allow(unused)]
fn main() {
#[async_trait]
pub trait StageExecutor: Send + Sync {
    /// Execute a stage on a single chunk
    async fn execute(
        &self,
        stage: &PipelineStage,
        chunk: FileChunk,
        context: &mut ProcessingContext,
    ) -> Result<FileChunk, PipelineError>;

    /// Execute a stage on multiple chunks in parallel
    async fn execute_parallel(
        &self,
        stage: &PipelineStage,
        chunks: Vec<FileChunk>,
        context: &mut ProcessingContext,
    ) -> Result<Vec<FileChunk>, PipelineError>;

    /// Validate if a stage can be executed
    async fn can_execute(&self, stage: &PipelineStage) -> Result<bool, PipelineError>;

    /// Get supported stage types
    fn supported_stage_types(&self) -> Vec<String>;

    /// Estimate processing time for a stage
    async fn estimate_processing_time(
        &self,
        stage: &PipelineStage,
        data_size: u64,
    ) -> Result<std::time::Duration, PipelineError>;

    /// Get resource requirements for a stage
    async fn get_resource_requirements(
        &self,
        stage: &PipelineStage,
        data_size: u64,
    ) -> Result<ResourceRequirements, PipelineError>;
}
}

BasicStageExecutor Implementation

The infrastructure layer provides a concrete implementation:

#![allow(unused)]
fn main() {
pub struct BasicStageExecutor {
    checksums: Arc<RwLock<HashMap<String, Sha256>>>,
    compression_service: Arc<dyn CompressionService>,
    encryption_service: Arc<dyn EncryptionService>,
}

impl BasicStageExecutor {
    pub fn new(
        compression_service: Arc<dyn CompressionService>,
        encryption_service: Arc<dyn EncryptionService>,
    ) -> Self {
        Self {
            checksums: Arc::new(RwLock::new(HashMap::new())),
            compression_service,
            encryption_service,
        }
    }
}
}

Supported Stage Types

The BasicStageExecutor supports:

  • Compression: Via CompressionService (Brotli, Gzip, Zstd, Lz4)
  • Encryption: Via EncryptionService (AES-256-GCM, ChaCha20-Poly1305)
  • Checksum: Via internal SHA-256 implementation
  • Transform: Via TeeService and DebugService (Tee, Debug)

Transform Stages Details

Transform stages provide observability and diagnostic capabilities without modifying data flow:

Tee Stage

The Tee stage copies data to a secondary output while passing it through unchanged - similar to the Unix tee command.

Configuration:

#![allow(unused)]
fn main() {
let mut params = HashMap::new();
params.insert("output_path".to_string(), "/tmp/debug-output.bin".to_string());
params.insert("format".to_string(), "hex".to_string());  // binary, hex, or text
params.insert("enabled".to_string(), "true".to_string());

let tee_stage = PipelineStage::new(
    "tee-compressed".to_string(),
    StageType::Transform,
    StageConfiguration::new("tee".to_string(), params, false),
    1,
)?;
}

Use Cases:

  • Debugging: Capture intermediate pipeline data for analysis
  • Monitoring: Sample data at specific pipeline stages
  • Audit Trails: Record data flow for compliance
  • Data Forking: Split data to multiple destinations

Output Formats:

  • binary: Raw binary output (default)
  • hex: Hexadecimal dump with ASCII sidebar (like hexdump -C)
  • text: UTF-8 text (lossy conversion for non-UTF8)

Performance: Limited by I/O speed to tee output file

Debug Stage

The Debug stage monitors data flow with zero modification, emitting Prometheus metrics for real-time observability.

Configuration:

#![allow(unused)]
fn main() {
let mut params = HashMap::new();
params.insert("label".to_string(), "after-compression".to_string());

let debug_stage = PipelineStage::new(
    "debug-compressed".to_string(),
    StageType::Transform,
    StageConfiguration::new("debug".to_string(), params, false),
    2,
)?;
}

Use Cases:

  • Pipeline Debugging: Identify where data corruption occurs
  • Performance Analysis: Measure throughput between stages
  • Corruption Detection: Calculate checksums at intermediate points
  • Production Monitoring: Real-time visibility into processing

Metrics Emitted (Prometheus):

  • debug_stage_checksum{label, chunk_id}: SHA256 of chunk data
  • debug_stage_bytes{label, chunk_id}: Bytes processed per chunk
  • debug_stage_chunks_total{label}: Total chunks processed

Performance: Minimal overhead, pass-through operation with checksum calculation

Transform Stage Positioning

Both Tee and Debug stages can be placed anywhere in the pipeline (StagePosition::Any) and are fully reversible. They're ideal for:

#![allow(unused)]
fn main() {
// Example: Debug pipeline with monitoring at key points
vec![
    // Input checkpoint
    PipelineStage::new("debug-input".to_string(), StageType::Transform,
        debug_config("input"), 0)?,

    // Compression
    PipelineStage::new("compress".to_string(), StageType::Compression,
        compression_config(), 1)?,

    // Capture compressed data
    PipelineStage::new("tee-compressed".to_string(), StageType::Transform,
        tee_config("/tmp/compressed.bin", "hex"), 2)?,

    // Encryption
    PipelineStage::new("encrypt".to_string(), StageType::Encryption,
        encryption_config(), 3)?,

    // Output checkpoint
    PipelineStage::new("debug-output".to_string(), StageType::Transform,
        debug_config("output"), 4)?,
]
}

Compatibility and Ordering

Stages have compatibility rules that ensure optimal pipeline performance.

1. Input Checksum (automatic)
   ↓
2. Compression (reduces data size)
   ↓
3. Encryption (secures compressed data)
   ↓
4. Output Checksum (automatic)

Rationale:

  • Compress before encrypting to reduce encrypted payload size
  • Checksum before compression to detect input corruption early
  • Checksum after encryption to verify output integrity

Compatibility Matrix

From \ To      | Compression | Encryption | Checksum | PassThrough | Transform
---------------|-------------|------------|----------|-------------|----------
Compression    | ❌ No       | ✅ Yes     | ✅ Yes   | ✅ Yes      | ⚠️ Rare
Encryption     | ❌ No       | ❌ No      | ✅ Yes   | ✅ Yes      | ❌ No
Checksum       | ✅ Yes      | ✅ Yes     | ✅ Yes   | ✅ Yes      | ✅ Yes
PassThrough    | ✅ Yes      | ✅ Yes     | ✅ Yes   | ✅ Yes      | ✅ Yes
Transform      | ✅ Yes      | ✅ Yes     | ✅ Yes   | ✅ Yes      | ⚠️ Depends

Legend:

  • ✅ Yes: Recommended combination
  • ❌ No: Not recommended (avoid duplication or inefficiency)
  • ⚠️ Rare/Depends: Context-dependent

Checking Compatibility

#![allow(unused)]
fn main() {
let compression = PipelineStage::new(
    "compression".to_string(),
    StageType::Compression,
    StageConfiguration::default(),
    0,
).unwrap();

let encryption = PipelineStage::new(
    "encryption".to_string(),
    StageType::Encryption,
    StageConfiguration::default(),
    1,
).unwrap();

// Compression should come before encryption
assert!(compression.is_compatible_with(&encryption));
}

Compatibility Rules

The is_compatible_with method implements these rules:

  1. Compression → Encryption: ✅ Compress first, then encrypt
  2. Compression → Compression: ❌ Avoid double compression
  3. Encryption → Encryption: ❌ Avoid double encryption
  4. Encryption → Compression: ❌ Cannot compress encrypted data effectively
  5. PassThrough → Any: ✅ No restrictions
  6. Checksum → Any: ✅ Checksums compatible with everything

Resource Management

Stages provide resource estimation and requirements to enable efficient execution planning.

Resource Requirements

#![allow(unused)]
fn main() {
#[derive(Debug, Clone)]
pub struct ResourceRequirements {
    pub memory_bytes: u64,
    pub cpu_cores: u32,
    pub disk_space_bytes: u64,
    pub network_bandwidth_bps: Option<u64>,
    pub gpu_memory_bytes: Option<u64>,
    pub estimated_duration: std::time::Duration,
}
}

Default Requirements

#![allow(unused)]
fn main() {
ResourceRequirements::default()
// memory_bytes: 64 MB
// cpu_cores: 1
// disk_space_bytes: 0
// network_bandwidth_bps: None
// gpu_memory_bytes: None
// estimated_duration: 1 second
}

Custom Requirements

#![allow(unused)]
fn main() {
let requirements = ResourceRequirements::new(
    128 * 1024 * 1024,  // 128 MB memory
    4,                   // 4 CPU cores
    1024 * 1024 * 1024, // 1 GB disk space
)
.with_duration(Duration::from_secs(30))
.with_network_bandwidth(100_000_000); // 100 Mbps
}

Estimating Resources

#![allow(unused)]
fn main() {
let executor: Arc<dyn StageExecutor> = /* ... */;
let requirements = executor.get_resource_requirements(
    &stage,
    10 * 1024 * 1024,  // 10 MB data size
).await?;

println!("Memory required: {}", Byte::from_bytes(requirements.memory_bytes));
println!("CPU cores: {}", requirements.cpu_cores);
println!("Estimated time: {:?}", requirements.estimated_duration);
}

Scaling Requirements

#![allow(unused)]
fn main() {
let mut requirements = ResourceRequirements::default();
requirements.scale(2.0);  // Double all requirements
}

Merging Requirements

#![allow(unused)]
fn main() {
let mut req1 = ResourceRequirements::default();
let req2 = ResourceRequirements::new(256_000_000, 2, 0);
req1.merge(&req2);  // Takes maximum of each field
}

Usage Examples

Example 1: Creating a Compression Stage

#![allow(unused)]
fn main() {
use adaptive_pipeline_domain::entities::pipeline_stage::{PipelineStage, StageConfiguration, StageType};
use std::collections::HashMap;

let mut params = HashMap::new();
params.insert("level".to_string(), "9".to_string());

let config = StageConfiguration::new(
    "zstd".to_string(),
    params,
    true,  // Enable parallel processing
);

let compression_stage = PipelineStage::new(
    "fast-compression".to_string(),
    StageType::Compression,
    config,
    1,  // Execute after input checksum (order 0)
)?;

println!("Created stage: {}", compression_stage.name());
println!("Algorithm: {}", compression_stage.algorithm());
}

Example 2: Creating an Encryption Stage

#![allow(unused)]
fn main() {
let mut params = HashMap::new();
params.insert("key_size".to_string(), "256".to_string());

let config = StageConfiguration::new(
    "aes256gcm".to_string(),
    params,
    false,  // Sequential processing for security
);

let encryption_stage = PipelineStage::new(
    "secure-encryption".to_string(),
    StageType::Encryption,
    config,
    2,  // Execute after compression
)?;
}

Example 3: Building a Complete Pipeline

#![allow(unused)]
fn main() {
let mut stages = Vec::new();

// Stage 0: Input checksum
let checksum_in = PipelineStage::new(
    "input-checksum".to_string(),
    StageType::Checksum,
    StageConfiguration::new("sha256".to_string(), HashMap::new(), true),
    0,
)?;
stages.push(checksum_in);

// Stage 1: Compression
let mut compress_params = HashMap::new();
compress_params.insert("level".to_string(), "6".to_string());
let compression = PipelineStage::new(
    "compression".to_string(),
    StageType::Compression,
    StageConfiguration::new("brotli".to_string(), compress_params, true),
    1,
)?;
stages.push(compression);

// Stage 2: Encryption
let mut encrypt_params = HashMap::new();
encrypt_params.insert("key_size".to_string(), "256".to_string());
let encryption = PipelineStage::new(
    "encryption".to_string(),
    StageType::Encryption,
    StageConfiguration::new("aes256gcm".to_string(), encrypt_params, false),
    2,
)?;
stages.push(encryption);

// Stage 3: Output checksum
let checksum_out = PipelineStage::new(
    "output-checksum".to_string(),
    StageType::Checksum,
    StageConfiguration::new("sha256".to_string(), HashMap::new(), true),
    3,
)?;
stages.push(checksum_out);

// Validate compatibility
for i in 0..stages.len() - 1 {
    assert!(stages[i].is_compatible_with(&stages[i + 1]));
}
}

Example 4: Executing a Stage

#![allow(unused)]
fn main() {
use adaptive_pipeline_domain::repositories::stage_executor::StageExecutor;

let executor: Arc<dyn StageExecutor> = /* ... */;
let stage = /* ... */;
let chunk = FileChunk::new(0, vec![1, 2, 3, 4, 5]);
let mut context = ProcessingContext::new("pipeline-123");

// Execute single chunk
let result = executor.execute(&stage, chunk, &mut context).await?;

println!("Processed {} bytes", result.data().len());
}

Example 5: Parallel Execution

#![allow(unused)]
fn main() {
let chunks = vec![
    FileChunk::new(0, vec![1, 2, 3]),
    FileChunk::new(1, vec![4, 5, 6]),
    FileChunk::new(2, vec![7, 8, 9]),
];

let results = executor.execute_parallel(&stage, chunks, &mut context).await?;

println!("Processed {} chunks", results.len());
}

Performance Considerations

Chunk Size Selection

Chunk size significantly impacts stage performance:

Data SizeRecommended Chunk SizeRationale
< 10 MB1 MBMinimize overhead
10-100 MB2-4 MBBalance memory/IO
100 MB - 1 GB4-8 MBOptimize parallelization
> 1 GB8-16 MBMaximize throughput
#![allow(unused)]
fn main() {
let mut config = StageConfiguration::default();
config.chunk_size = Some(4 * 1024 * 1024);  // 4 MB chunks
}

Parallel Processing

Enable parallel processing for CPU-bound operations:

#![allow(unused)]
fn main() {
// Compression: parallel processing beneficial
let compress_config = StageConfiguration::new(
    "zstd".to_string(),
    HashMap::new(),
    true,  // Enable parallel
);

// Encryption: sequential often better for security
let encrypt_config = StageConfiguration::new(
    "aes256gcm".to_string(),
    HashMap::new(),
    false,  // Disable parallel
);
}

Stage Ordering Impact

Optimal:

Checksum → Compress (6:1 ratio) → Encrypt → Checksum
1 GB → 1 GB → 167 MB → 167 MB → 167 MB

Suboptimal:

Checksum → Encrypt → Compress (1.1:1 ratio) → Checksum
1 GB → 1 GB → 1 GB → 909 MB → 909 MB

Encrypting before compression reduces compression ratio from 6:1 to 1.1:1.

Memory Usage

Per-stage memory usage:

Stage TypeMemory per ChunkNotes
Compression2-3x chunk sizeCompression buffers
Encryption1-1.5x chunk sizeEncryption overhead
Checksum~256 bytesHash state only
PassThrough1x chunk sizeNo additional memory

CPU Utilization

CPU-intensive stages:

  1. Compression: High CPU usage (especially Brotli level 9+)
  2. Encryption: Moderate CPU usage (AES-NI acceleration helps)
  3. Checksum: Low CPU usage (Blake3 faster than SHA-256)

Best Practices

1. Stage Naming

Use descriptive, kebab-case names:

#![allow(unused)]
fn main() {
// ✅ Good
"input-checksum", "fast-compression", "secure-encryption"

// ❌ Bad
"stage1", "s", "MyStage"
}

2. Configuration Validation

Always validate configurations:

#![allow(unused)]
fn main() {
let stage = PipelineStage::new(/* ... */)?;
stage.validate()?;  // Validate before execution
}

3. Optimal Ordering

Follow the recommended order:

1. Input Checksum
2. Compression
3. Encryption
4. Output Checksum

4. Enable/Disable vs. Remove

Prefer disabling over removing stages:

#![allow(unused)]
fn main() {
// ✅ Good: Preserve configuration
stage.set_enabled(false);

// ❌ Bad: Lose configuration
stages.retain(|s| s.name() != "compression");
}

5. Resource Estimation

Estimate resources before execution:

#![allow(unused)]
fn main() {
let requirements = executor.get_resource_requirements(&stage, file_size).await?;

if requirements.memory_bytes > available_memory {
    // Adjust chunk size or process sequentially
}
}

6. Error Handling

Handle stage-specific errors appropriately:

#![allow(unused)]
fn main() {
match executor.execute(&stage, chunk, &mut context).await {
    Ok(result) => { /* success */ },
    Err(PipelineError::CompressionFailed(msg)) => {
        // Handle compression errors
    },
    Err(PipelineError::EncryptionFailed(msg)) => {
        // Handle encryption errors
    },
    Err(e) => {
        // Handle generic errors
    },
}
}

7. Monitoring

Track stage execution metrics:

#![allow(unused)]
fn main() {
let start = Instant::now();
let result = executor.execute(&stage, chunk, &mut context).await?;
let duration = start.elapsed();

println!("Stage '{}' processed {} bytes in {:?}",
    stage.name(),
    result.data().len(),
    duration
);
}

8. Testing

Test stages in isolation:

#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_compression_stage() {
    let stage = create_compression_stage();
    let executor = create_test_executor();
    let chunk = FileChunk::new(0, vec![0u8; 1024]);
    let mut context = ProcessingContext::new("test");

    let result = executor.execute(&stage, chunk, &mut context).await.unwrap();

    assert!(result.data().len() < 1024);  // Compression worked
}
}

Troubleshooting

Issue 1: Stage Validation Fails

Symptom:

Error: InvalidConfiguration("Stage name cannot be empty")

Solution:

#![allow(unused)]
fn main() {
// Ensure stage name is not empty
let stage = PipelineStage::new(
    "my-stage".to_string(),  // ✅ Non-empty name
    stage_type,
    config,
    order,
)?;
}

Issue 2: Incompatible Stage Order

Symptom:

Error: IncompatibleStages("Cannot encrypt before compressing")

Solution:

#![allow(unused)]
fn main() {
// Check compatibility before adding stages
if !previous_stage.is_compatible_with(&new_stage) {
    // Reorder stages
}
}

Issue 3: Chunk Size Validation Error

Symptom:

Error: InvalidConfiguration("Chunk size must be between 1KB and 100MB")

Solution:

#![allow(unused)]
fn main() {
let mut config = StageConfiguration::default();
config.chunk_size = Some(4 * 1024 * 1024);  // ✅ 4 MB (valid range)
// config.chunk_size = Some(512);  // ❌ Too small (< 1KB)
// config.chunk_size = Some(200_000_000);  // ❌ Too large (> 100MB)
}

Issue 4: Out of Memory During Execution

Symptom:

Error: ResourceExhaustion("Insufficient memory for stage execution")

Solution:

#![allow(unused)]
fn main() {
// Reduce chunk size or disable parallel processing
let mut config = stage.configuration().clone();
config.chunk_size = Some(1 * 1024 * 1024);  // Reduce to 1 MB
config.parallel_processing = false;  // Disable parallel
stage.update_configuration(config);
}

Issue 5: Stage Executor Not Found

Symptom:

Error: ExecutorNotFound("No executor for stage type 'CustomStage'")

Solution:

#![allow(unused)]
fn main() {
// Check supported stage types
let supported = executor.supported_stage_types();
println!("Supported: {:?}", supported);

// Use a supported stage type
let stage = PipelineStage::new(
    "compression".to_string(),
    StageType::Compression,  // ✅ Supported type
    config,
    0,
)?;
}

Issue 6: Performance Degradation

Symptom: Stage execution is slower than expected.

Diagnosis:

#![allow(unused)]
fn main() {
let requirements = executor.get_resource_requirements(&stage, file_size).await?;
let duration = executor.estimate_processing_time(&stage, file_size).await?;

println!("Expected duration: {:?}", duration);
println!("Memory needed: {}", Byte::from_bytes(requirements.memory_bytes));
}

Solutions:

  • Enable parallel processing for compression stages
  • Increase chunk size for large files
  • Use faster algorithms (e.g., Lz4 instead of Brotli)
  • Check system resource availability

Testing Strategies

Unit Tests

Test individual stage operations:

#![allow(unused)]
fn main() {
#[test]
fn test_stage_creation() {
    let stage = PipelineStage::new(
        "test-stage".to_string(),
        StageType::Compression,
        StageConfiguration::default(),
        0,
    );
    assert!(stage.is_ok());
}

#[test]
fn test_stage_validation() {
    let stage = PipelineStage::new(
        "".to_string(),  // Empty name
        StageType::Compression,
        StageConfiguration::default(),
        0,
    );
    assert!(stage.is_err());
}
}

Integration Tests

Test stage execution with real executors:

#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_compression_integration() {
    let compression_service = create_compression_service();
    let encryption_service = create_encryption_service();
    let executor = BasicStageExecutor::new(compression_service, encryption_service);

    let stage = create_compression_stage();
    let chunk = FileChunk::new(0, vec![0u8; 10000]);
    let mut context = ProcessingContext::new("test-pipeline");

    let result = executor.execute(&stage, chunk, &mut context).await.unwrap();

    assert!(result.data().len() < 10000);  // Verify compression
}
}

Property-Based Tests

Test stage invariants:

#![allow(unused)]
fn main() {
#[quickcheck]
fn stage_order_preserved(order: u32) -> bool {
    let stage = PipelineStage::new(
        "test".to_string(),
        StageType::Checksum,
        StageConfiguration::default(),
        order,
    ).unwrap();

    stage.order() == order
}
}

Compatibility Tests

Test stage compatibility matrix:

#![allow(unused)]
fn main() {
#[test]
fn test_compression_encryption_compatibility() {
    let compression = create_stage(StageType::Compression, 0);
    let encryption = create_stage(StageType::Encryption, 1);

    assert!(compression.is_compatible_with(&encryption));
    assert!(encryption.is_compatible_with(&create_stage(StageType::Checksum, 2)));
}
}

Next Steps

After understanding stage processing fundamentals, explore specific implementations:

Detailed Stage Implementations

  1. Compression: Deep dive into compression algorithms and performance tuning
  2. Encryption: Encryption implementation, key management, and security considerations
  3. Integrity Checking: Checksum algorithms and verification strategies
  • Data Persistence: How stages are persisted and retrieved from the database
  • File I/O: File chunking and binary format for stage data
  • Observability: Monitoring stage execution and performance

Advanced Topics


Summary

Key Takeaways:

  1. Stages are the fundamental building blocks of pipelines, each performing a specific transformation
  2. Five stage types are supported: Compression, Encryption, Transform, Checksum, PassThrough
  3. PipelineStage is a domain entity with identity, configuration, and lifecycle management
  4. Stage compatibility rules ensure optimal ordering (compress before encrypt)
  5. StageExecutor trait provides async execution with resource estimation
  6. Resource management enables efficient execution planning and monitoring
  7. Best practices include proper naming, validation, and error handling

Configuration File Reference: pipeline/src/domain/entities/pipeline_stage.rs Executor Interface: pipeline-domain/src/repositories/stage_executor.rs:156 Executor Implementation: pipeline/src/infrastructure/repositories/stage_executor.rs:175