Concurrency Model
Version: 0.1.0 Date: October 08, 2025 SPDX-License-Identifier: BSD-3-Clause License File: See the LICENSE file in the project root. Copyright: © 2025 Michael Gardner, A Bit of Help, Inc. Authors: Michael Gardner Status: Draft
This chapter provides a comprehensive overview of the concurrency model in the adaptive pipeline system. Learn how async/await, Tokio runtime, and concurrent patterns enable high-performance, scalable file processing.
Table of Contents
- Overview
- Concurrency Architecture
- Async/Await Model
- Tokio Runtime
- Parallel Chunk Processing
- Concurrency Primitives
- Thread Pools and Workers
- Resource Management
- Concurrency Patterns
- Performance Considerations
- Best Practices
- Troubleshooting
- Next Steps
Overview
The concurrency model enables the adaptive pipeline to process files efficiently through parallel processing, async I/O, and concurrent chunk handling. The system uses Rust's async/await with the Tokio runtime for high-performance, scalable concurrency.
Key Features
- Async/Await: Non-blocking asynchronous operations
- Tokio Runtime: Multi-threaded async runtime
- Parallel Processing: Concurrent chunk processing
- Worker Pools: Configurable thread pools
- Resource Management: Efficient resource allocation and cleanup
- Thread Safety: Safe concurrent access through Rust's type system
Concurrency Stack
┌─────────────────────────────────────────────────────────────┐
│ Application Layer │
│ - Pipeline orchestration │
│ - File processing coordination │
└─────────────────────────────────────────────────────────────┘
↓ async
┌─────────────────────────────────────────────────────────────┐
│ Tokio Runtime │
│ - Multi-threaded work-stealing scheduler │
│ - Async task execution │
│ - I/O reactor │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Concurrency Primitives │
│ ┌─────────┬──────────┬──────────┬─────────────┐ │
│ │ Mutex │ RwLock │ Semaphore│ Channel │ │
│ │ (Sync) │ (Shared) │ (Limit) │ (Message) │ │
│ └─────────┴──────────┴──────────┴─────────────┘ │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Worker Threads │
│ - Chunk processing workers │
│ - I/O workers │
│ - Background tasks │
└─────────────────────────────────────────────────────────────┘
Design Principles
- Async-First: All I/O operations are asynchronous
- Structured Concurrency: Clear task ownership and lifetimes
- Safe Sharing: Thread-safe sharing through Arc and sync primitives
- Resource Bounded: Limited resource usage with semaphores
- Zero-Cost Abstractions: Minimal overhead from async runtime
Concurrency Architecture
The system uses a layered concurrency architecture with clear separation between sync and async code.
Architectural Layers
┌─────────────────────────────────────────────────────────────┐
│ Async Layer (I/O Bound) │
│ ┌──────────────────────────────────────────────────┐ │
│ │ File I/O Service (async) │ │
│ │ - tokio::fs file operations │ │
│ │ - Async read/write │ │
│ └──────────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Pipeline Service (async orchestration) │ │
│ │ - Async workflow coordination │ │
│ │ - Task spawning and management │ │
│ └──────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Sync Layer (CPU Bound) │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Compression Service (sync) │ │
│ │ - CPU-bound compression algorithms │ │
│ │ - No async overhead │ │
│ └──────────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Encryption Service (sync) │ │
│ │ - CPU-bound encryption algorithms │ │
│ │ - No async overhead │ │
│ └──────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Async vs Sync Decision
Async for:
- File I/O (tokio::fs)
- Network I/O
- Database operations
- Long-running waits
Sync for:
- CPU-bound compression
- CPU-bound encryption
- Hash calculations
- Pure computation
Async/Await Model
The system uses Rust's async/await for non-blocking concurrency.
Async Functions
#![allow(unused)] fn main() { // Async function definition async fn process_file( path: &Path, chunk_size: ChunkSize, ) -> Result<Vec<FileChunk>, PipelineError> { // Await async operations let chunks = read_file_chunks(path, chunk_size).await?; // Process chunks let results = process_chunks_parallel(chunks).await?; Ok(results) } }
Awaiting Futures
#![allow(unused)] fn main() { // Sequential awaits let chunks = service.read_file_chunks(path, chunk_size).await?; let processed = process_chunks(chunks).await?; service.write_file_chunks(output_path, processed).await?; // Parallel awaits with join use tokio::try_join; let (chunks1, chunks2) = try_join!( service.read_file_chunks(path1, chunk_size), service.read_file_chunks(path2, chunk_size), )?; }
Async Traits
#![allow(unused)] fn main() { use async_trait::async_trait; #[async_trait] pub trait FileIOService: Send + Sync { async fn read_file_chunks( &self, path: &Path, chunk_size: ChunkSize, ) -> Result<Vec<FileChunk>, PipelineError>; async fn write_file_chunks( &self, path: &Path, chunks: Vec<FileChunk>, ) -> Result<(), PipelineError>; } }
Tokio Runtime
The system uses Tokio's multi-threaded runtime for async execution.
Runtime Configuration
#![allow(unused)] fn main() { use tokio::runtime::Runtime; // Multi-threaded runtime (default) let runtime = Runtime::new()?; // Custom configuration let runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(8) // 8 worker threads .thread_name("pipeline-worker") .thread_stack_size(3 * 1024 * 1024) // 3 MB stack .enable_all() // Enable I/O and time drivers .build()?; // Execute async work runtime.block_on(async { process_file(path, chunk_size).await?; }); }
Runtime Selection
// Multi-threaded runtime (CPU-bound + I/O) #[tokio::main] async fn main() { // Automatically uses multi-threaded runtime process_pipeline().await; } // Current-thread runtime (testing, single-threaded) #[tokio::main(flavor = "current_thread")] async fn main() { // Single-threaded runtime process_pipeline().await; }
Work-Stealing Scheduler
Tokio uses a work-stealing scheduler for load balancing:
Thread 1: [Task A] [Task B] ────────> (idle, steals Task D)
Thread 2: [Task C] [Task D] [Task E] (busy)
Thread 3: [Task F] ────────────────> (idle, steals Task E)
Parallel Chunk Processing
Chunks are processed concurrently for maximum throughput.
Parallel Processing with try_join_all
#![allow(unused)] fn main() { use futures::future::try_join_all; async fn process_chunks_parallel( chunks: Vec<FileChunk>, ) -> Result<Vec<FileChunk>, PipelineError> { // Spawn tasks for each chunk let futures = chunks.into_iter().map(|chunk| { tokio::spawn(async move { process_chunk(chunk).await }) }); // Wait for all to complete let results = try_join_all(futures).await?; // Collect results Ok(results.into_iter().collect::<Result<Vec<_>, _>>()?) } }
Bounded Parallelism
#![allow(unused)] fn main() { use tokio::sync::Semaphore; use std::sync::Arc; async fn process_with_limit( chunks: Vec<FileChunk>, max_parallel: usize, ) -> Result<Vec<FileChunk>, PipelineError> { let semaphore = Arc::new(Semaphore::new(max_parallel)); let futures = chunks.into_iter().map(|chunk| { let permit = semaphore.clone(); async move { let _guard = permit.acquire().await.unwrap(); process_chunk(chunk).await } }); try_join_all(futures).await } }
Pipeline Parallelism
#![allow(unused)] fn main() { // Stage 1: Read chunks let chunks = read_chunks_stream(path, chunk_size); // Stage 2: Process chunks (parallel) let processed = chunks .map(|chunk| async move { tokio::spawn(async move { compress_chunk(chunk).await }).await }) .buffer_unordered(8); // Up to 8 chunks in flight // Stage 3: Write chunks write_chunks_stream(processed).await?; }
Concurrency Primitives
The system uses Tokio's async-aware concurrency primitives.
Async Mutex
#![allow(unused)] fn main() { use tokio::sync::Mutex; use std::sync::Arc; let shared_state = Arc::new(Mutex::new(HashMap::new())); // Acquire lock asynchronously let mut state = shared_state.lock().await; state.insert(key, value); // Lock automatically released when dropped }
Async RwLock
#![allow(unused)] fn main() { use tokio::sync::RwLock; use std::sync::Arc; let config = Arc::new(RwLock::new(PipelineConfig::default())); // Multiple readers let config_read = config.read().await; let chunk_size = config_read.chunk_size; // Single writer let mut config_write = config.write().await; config_write.chunk_size = ChunkSize::from_mb(16)?; }
Channels (mpsc)
#![allow(unused)] fn main() { use tokio::sync::mpsc; // Create channel let (tx, mut rx) = mpsc::channel::<FileChunk>(100); // Send chunks tokio::spawn(async move { for chunk in chunks { tx.send(chunk).await.unwrap(); } }); // Receive chunks while let Some(chunk) = rx.recv().await { process_chunk(chunk).await?; } }
Semaphores
#![allow(unused)] fn main() { use tokio::sync::Semaphore; // Limit concurrent operations let semaphore = Arc::new(Semaphore::new(4)); // Max 4 concurrent for chunk in chunks { let permit = semaphore.clone().acquire_owned().await.unwrap(); tokio::spawn(async move { let result = process_chunk(chunk).await; drop(permit); // Release permit result }); } }
Thread Pools and Workers
Worker pools manage concurrent task execution.
Worker Pool Configuration
#![allow(unused)] fn main() { pub struct WorkerPool { max_workers: usize, semaphore: Arc<Semaphore>, } impl WorkerPool { pub fn new(max_workers: usize) -> Self { Self { max_workers, semaphore: Arc::new(Semaphore::new(max_workers)), } } pub async fn execute<F, T>(&self, task: F) -> Result<T, PipelineError> where F: Future<Output = Result<T, PipelineError>> + Send + 'static, T: Send + 'static, { let _permit = self.semaphore.acquire().await.unwrap(); tokio::spawn(task).await.unwrap() } } }
Adaptive Worker Count
#![allow(unused)] fn main() { fn optimal_worker_count() -> usize { let cpu_count = num_cpus::get(); // For I/O-bound: 2x CPU count // For CPU-bound: 1x CPU count // For mixed: 1.5x CPU count (cpu_count as f64 * 1.5) as usize } let worker_pool = WorkerPool::new(optimal_worker_count()); }
For detailed worker pool implementation, see Thread Pooling.
Resource Management
Efficient resource management is critical for concurrent systems.
Resource Limits
#![allow(unused)] fn main() { pub struct ResourceLimits { max_memory: usize, max_file_handles: usize, max_concurrent_tasks: usize, } impl ResourceLimits { pub fn calculate_max_parallel_chunks(&self, chunk_size: ChunkSize) -> usize { let memory_limit = self.max_memory / chunk_size.bytes(); let task_limit = self.max_concurrent_tasks; memory_limit.min(task_limit) } } }
Resource Tracking
#![allow(unused)] fn main() { use std::sync::atomic::{AtomicUsize, Ordering}; pub struct ResourceTracker { active_tasks: AtomicUsize, memory_used: AtomicUsize, } impl ResourceTracker { pub fn acquire_task(&self) -> TaskGuard { self.active_tasks.fetch_add(1, Ordering::SeqCst); TaskGuard { tracker: self } } } pub struct TaskGuard<'a> { tracker: &'a ResourceTracker, } impl Drop for TaskGuard<'_> { fn drop(&mut self) { self.tracker.active_tasks.fetch_sub(1, Ordering::SeqCst); } } }
For detailed resource management, see Resource Management.
Concurrency Patterns
Common concurrency patterns used in the pipeline.
Pattern 1: Fan-Out/Fan-In
#![allow(unused)] fn main() { // Fan-out: Distribute work to multiple workers let futures = chunks.into_iter().map(|chunk| { tokio::spawn(async move { process_chunk(chunk).await }) }); // Fan-in: Collect results let results = try_join_all(futures).await?; }
Pattern 2: Pipeline Pattern
#![allow(unused)] fn main() { use tokio_stream::StreamExt; // Stage 1 → Stage 2 → Stage 3 let result = read_stream(path) .map(|chunk| compress_chunk(chunk)) .buffer_unordered(8) .map(|chunk| encrypt_chunk(chunk)) .buffer_unordered(8) .collect::<Vec<_>>() .await; }
Pattern 3: Worker Pool Pattern
#![allow(unused)] fn main() { let pool = WorkerPool::new(8); for chunk in chunks { pool.execute(async move { process_chunk(chunk).await }).await?; } }
Pattern 4: Rate Limiting
#![allow(unused)] fn main() { use tokio::time::{interval, Duration}; let mut interval = interval(Duration::from_millis(100)); for chunk in chunks { interval.tick().await; // Rate limit: 10 chunks/sec process_chunk(chunk).await?; } }
Performance Considerations
Tokio Task Overhead
Operation | Cost | Notes |
---|---|---|
Spawn task | ~1-2 μs | Very lightweight |
Context switch | ~100 ns | Work-stealing scheduler |
Mutex lock | ~50 ns | Uncontended case |
Channel send | ~100-200 ns | Depends on channel type |
Choosing Concurrency Level
#![allow(unused)] fn main() { fn optimal_concurrency( file_size: u64, chunk_size: ChunkSize, available_memory: usize, ) -> usize { let num_chunks = (file_size / chunk_size.bytes() as u64) as usize; let memory_limit = available_memory / chunk_size.bytes(); let cpu_limit = num_cpus::get() * 2; num_chunks.min(memory_limit).min(cpu_limit) } }
Avoiding Contention
#![allow(unused)] fn main() { // ❌ Bad: High contention let counter = Arc::new(Mutex::new(0)); for _ in 0..1000 { let c = counter.clone(); tokio::spawn(async move { *c.lock().await += 1; // Lock contention! }); } // ✅ Good: Reduce contention let counter = Arc::new(AtomicUsize::new(0)); for _ in 0..1000 { let c = counter.clone(); tokio::spawn(async move { c.fetch_add(1, Ordering::Relaxed); // Lock-free! }); } }
Best Practices
1. Use Async for I/O, Sync for CPU
#![allow(unused)] fn main() { // ✅ Good: Async I/O async fn read_file(path: &Path) -> Result<Vec<u8>, Error> { tokio::fs::read(path).await } // ✅ Good: Sync CPU-bound fn compress_data(data: &[u8]) -> Result<Vec<u8>, Error> { brotli::compress(data) // Sync, CPU-bound } // ❌ Bad: Async for CPU-bound async fn compress_data_async(data: &[u8]) -> Result<Vec<u8>, Error> { // Unnecessary async overhead brotli::compress(data) } }
2. Spawn Blocking for Sync Code
#![allow(unused)] fn main() { // ✅ Good: Spawn blocking task async fn process_chunk(chunk: FileChunk) -> Result<FileChunk, Error> { tokio::task::spawn_blocking(move || { // CPU-bound compression in blocking thread compress_sync(chunk) }).await? } }
3. Limit Concurrent Tasks
#![allow(unused)] fn main() { // ✅ Good: Bounded parallelism let semaphore = Arc::new(Semaphore::new(max_concurrent)); for chunk in chunks { let permit = semaphore.clone(); tokio::spawn(async move { let _guard = permit.acquire().await.unwrap(); process_chunk(chunk).await }); } // ❌ Bad: Unbounded parallelism for chunk in chunks { tokio::spawn(async move { process_chunk(chunk).await // May spawn thousands of tasks! }); } }
4. Use Channels for Communication
#![allow(unused)] fn main() { // ✅ Good: Channel communication let (tx, mut rx) = mpsc::channel(100); tokio::spawn(async move { while let Some(chunk) = rx.recv().await { process_chunk(chunk).await; } }); tx.send(chunk).await?; }
5. Handle Errors Properly
#![allow(unused)] fn main() { // ✅ Good: Proper error handling let results: Result<Vec<_>, _> = try_join_all(futures).await; match results { Ok(chunks) => { /* success */ }, Err(e) => { error!("Processing failed: {}", e); // Cleanup resources return Err(e); } } }
Troubleshooting
Issue 1: Too Many Tokio Tasks
Symptom:
thread 'tokio-runtime-worker' stack overflow
Solutions:
#![allow(unused)] fn main() { // 1. Limit concurrent tasks let semaphore = Arc::new(Semaphore::new(100)); // 2. Use buffer_unordered stream.buffer_unordered(10).collect().await // 3. Increase stack size tokio::runtime::Builder::new_multi_thread() .thread_stack_size(4 * 1024 * 1024) // 4 MB .build()?; }
Issue 2: Mutex Deadlock
Symptom: Tasks hang indefinitely.
Solutions:
#![allow(unused)] fn main() { // 1. Always acquire locks in same order async fn transfer(from: &Mutex<u64>, to: &Mutex<u64>, amount: u64) { let (first, second) = if ptr::eq(from, to) { panic!("Same account"); } else if (from as *const _ as usize) < (to as *const _ as usize) { (from, to) } else { (to, from) }; let mut a = first.lock().await; let mut b = second.lock().await; // Transfer logic } // 2. Use try_lock with timeout tokio::time::timeout(Duration::from_secs(5), mutex.lock()).await??; }
Issue 3: Channel Backpressure
Symptom:
Producer overwhelms consumer
Solutions:
#![allow(unused)] fn main() { // 1. Bounded channel let (tx, rx) = mpsc::channel::<FileChunk>(100); // Max 100 in flight // 2. Apply backpressure match tx.try_send(chunk) { Ok(()) => { /* sent */ }, Err(TrySendError::Full(_)) => { // Wait and retry tokio::time::sleep(Duration::from_millis(10)).await; }, Err(e) => return Err(e.into()), } }
Next Steps
After understanding the concurrency model, explore specific implementations:
Related Advanced Topics
- Thread Pooling: Worker pool implementation and optimization
- Resource Management: Memory and resource tracking
Related Topics
- Performance Optimization: Optimizing concurrent code
- File I/O: Async file operations
Summary
Key Takeaways:
- Async/Await provides non-blocking concurrency for I/O operations
- Tokio Runtime uses work-stealing for efficient task scheduling
- Parallel Processing enables concurrent chunk processing for throughput
- Concurrency Primitives (Mutex, RwLock, Semaphore) enable safe sharing
- Worker Pools manage bounded concurrent task execution
- Resource Management tracks and limits resource usage
- Patterns (fan-out/fan-in, pipeline, worker pool) structure concurrent code
Architecture File References:
- Pipeline Service:
pipeline/src/application/services/pipeline_service.rs:189
- File Processor:
pipeline/src/application/services/file_processor_service.rs:1