- Published on
Data processing pipeline patterns
Data processing pipeline patterns are essential tools for handling and managing data in the modern world. They offer a structured approach to data processing, enabling data to flow efficiently from one stage to another, while minimizing bottlenecks and ensuring the quality of the end result. In this article, we will explore various data processing pipeline patterns and how they can be utilized in different scenarios.
Linear Data Processing Pipeline Pattern
Overview
The linear data processing pipeline pattern is the most basic and widely used pattern. It involves a series of stages, each responsible for a specific data processing task, with data flowing sequentially from one stage to the next.
Advantages
- Simple and easy to understand
- Efficient when tasks can be executed sequentially
Disadvantages
- Limited parallelism and scalability
- Not suitable for complex data processing scenarios
Code Example
Here's a practical implementation of a linear pipeline for processing customer data:
import pandas as pd
from typing import Any, Callable
class LinearPipeline:
def __init__(self):
self.stages = []
def add_stage(self, name: str, func: Callable):
"""Add a processing stage to the pipeline"""
self.stages.append((name, func))
return self
def execute(self, data: Any) -> Any:
"""Execute all stages sequentially"""
result = data
for name, func in self.stages:
print(f"Executing stage: {name}")
result = func(result)
return result
# Example usage: ETL pipeline
def extract_data(source: str) -> pd.DataFrame:
"""Extract data from source"""
return pd.read_csv(source)
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
"""Clean and transform data"""
df = df.dropna()
df['total_amount'] = df['price'] * df['quantity']
df['processed_date'] = pd.Timestamp.now()
return df
def validate_data(df: pd.DataFrame) -> pd.DataFrame:
"""Validate data quality"""
assert not df.empty, "DataFrame is empty"
assert 'total_amount' in df.columns, "Missing total_amount column"
return df
def load_data(df: pd.DataFrame) -> str:
"""Load data to destination"""
output_path = 'output/processed_data.csv'
df.to_csv(output_path, index=False)
return f"Data saved to {output_path}"
# Build and execute pipeline
pipeline = LinearPipeline()
pipeline.add_stage("Extract", lambda x: extract_data(x))
pipeline.add_stage("Transform", transform_data)
pipeline.add_stage("Validate", validate_data)
pipeline.add_stage("Load", load_data)
result = pipeline.execute('input/customer_data.csv')
print(result)
Real-World Use Cases
- Log Processing: Parse logs -> Filter relevant entries -> Aggregate metrics -> Store results
- Data Migration: Extract from source -> Transform schema -> Validate -> Load to target
- Report Generation: Fetch data -> Calculate metrics -> Format report -> Send email
- Image Processing: Load image -> Resize -> Apply filters -> Compress -> Save
When to Use Linear Pattern
- Tasks have clear sequential dependencies
- Each stage requires output from previous stage
- Simple workflows with straightforward logic
- Small to medium data volumes
- No need for parallel processing
Branching Data Processing Pipeline Pattern
Overview
The branching data processing pipeline pattern introduces parallelism by allowing data to be processed in multiple branches simultaneously. Each branch represents a distinct data processing task, which can be executed concurrently to improve overall throughput.
Advantages
- Increased parallelism and scalability
- Better suited for complex data processing scenarios
Disadvantages
- Requires more sophisticated coordination and management
- Can be harder to debug and maintain
Code Example
Implementation of a branching pipeline with parallel processing:
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Callable, Any
class BranchingPipeline:
def __init__(self, max_workers: int = 4):
self.branches = []
self.max_workers = max_workers
def add_branch(self, name: str, stages: List[Callable]):
"""Add a processing branch"""
self.branches.append((name, stages))
return self
def execute(self, data: Any) -> Dict[str, Any]:
"""Execute all branches in parallel"""
results = {}
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all branches
future_to_branch = {
executor.submit(self._execute_branch, name, stages, data): name
for name, stages in self.branches
}
# Collect results
for future in as_completed(future_to_branch):
branch_name = future_to_branch[future]
try:
results[branch_name] = future.result()
print(f"Branch '{branch_name}' completed successfully")
except Exception as e:
print(f"Branch '{branch_name}' failed: {e}")
results[branch_name] = None
return results
def _execute_branch(self, name: str, stages: List[Callable], data: Any) -> Any:
"""Execute stages within a single branch"""
result = data
for stage in stages:
result = stage(result)
return result
# Example: Multi-channel analytics pipeline
def analyze_sales(df: pd.DataFrame) -> Dict:
"""Analyze sales metrics"""
return {
'total_sales': df['amount'].sum(),
'avg_order_value': df['amount'].mean(),
'transaction_count': len(df)
}
def analyze_customers(df: pd.DataFrame) -> Dict:
"""Analyze customer behavior"""
return {
'unique_customers': df['customer_id'].nunique(),
'returning_customers': df[df['is_returning']].shape[0],
'new_customers': df[~df['is_returning']].shape[0]
}
def analyze_products(df: pd.DataFrame) -> Dict:
"""Analyze product performance"""
top_products = df.groupby('product_id')['amount'].sum().nlargest(5)
return {
'top_products': top_products.to_dict(),
'total_products_sold': df['product_id'].nunique()
}
def analyze_geography(df: pd.DataFrame) -> Dict:
"""Analyze geographic distribution"""
return {
'sales_by_region': df.groupby('region')['amount'].sum().to_dict(),
'top_region': df.groupby('region')['amount'].sum().idxmax()
}
# Build and execute branching pipeline
df = pd.read_csv('sales_data.csv')
pipeline = BranchingPipeline(max_workers=4)
pipeline.add_branch("Sales Analysis", [analyze_sales])
pipeline.add_branch("Customer Analysis", [analyze_customers])
pipeline.add_branch("Product Analysis", [analyze_products])
pipeline.add_branch("Geographic Analysis", [analyze_geography])
results = pipeline.execute(df)
# Combine results
for branch_name, result in results.items():
print(f"\n{branch_name}:")
print(result)
Architecture Pattern
Input Data
|
+---------------+---------------+
| | |
Branch A Branch B Branch C
| | |
[Stage 1] [Stage 1] [Stage 1]
| | |
[Stage 2] [Stage 2] [Stage 2]
| | |
+---------------+---------------+
|
Merge Results
Real-World Use Cases
- Multi-Format Data Export: Process data -> Branch to [JSON, CSV, Parquet, Database] simultaneously
- Social Media Analytics: Fetch posts -> Branch to [sentiment analysis, engagement metrics, trend detection]
- E-commerce Order Processing: Receive order -> Branch to [payment, inventory, shipping, notification]
- Machine Learning Pipeline: Load data -> Branch to [feature engineering A, B, C] -> Train multiple models
- Content Delivery: Upload media -> Branch to [thumbnail generation, multiple format conversions, metadata extraction]
When to Use Branching Pattern
- Independent processing paths needed
- Multiple outputs from same input
- Different analysis perspectives required
- Parallel execution improves performance
- Branches don't depend on each other's results
Looping Data Processing Pipeline Pattern
Overview
The looping data processing pipeline pattern introduces iterative processing, where data is processed in a loop until a specific condition is met. This pattern is particularly useful for handling data that requires multiple passes, such as iterative algorithms or machine learning models.
Advantages
- Allows for iterative processing
- Enables the refinement of data or model parameters
Disadvantages
- Can be computationally intensive
- Requires careful monitoring to avoid infinite loops
Code Example
Implementation of a looping pipeline with convergence detection:
import pandas as pd
import numpy as np
from typing import Callable, Any, Tuple
class LoopingPipeline:
def __init__(self, max_iterations: int = 100, tolerance: float = 1e-6):
self.max_iterations = max_iterations
self.tolerance = tolerance
self.stages = []
self.convergence_check = None
def add_stage(self, func: Callable):
"""Add a processing stage"""
self.stages.append(func)
return self
def set_convergence_check(self, func: Callable[[Any, Any], Tuple[bool, float]]):
"""Set function to check convergence (returns: has_converged, metric)"""
self.convergence_check = func
return self
def execute(self, initial_data: Any) -> Tuple[Any, Dict]:
"""Execute pipeline with looping until convergence"""
current_data = initial_data
previous_data = None
iteration = 0
metrics = []
while iteration < self.max_iterations:
iteration += 1
previous_data = current_data
# Execute all stages
for stage in self.stages:
current_data = stage(current_data)
# Check convergence
if self.convergence_check:
converged, metric = self.convergence_check(current_data, previous_data)
metrics.append(metric)
print(f"Iteration {iteration}: metric = {metric:.6f}")
if converged:
print(f"Converged after {iteration} iterations")
break
else:
print(f"Iteration {iteration} completed")
if iteration >= self.max_iterations:
print(f"Reached maximum iterations ({self.max_iterations})")
return current_data, {
'iterations': iteration,
'converged': converged if self.convergence_check else None,
'metrics': metrics
}
# Example 1: Iterative data cleaning
def remove_outliers(df: pd.DataFrame) -> pd.DataFrame:
"""Remove statistical outliers"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
df = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)]
return df
def impute_missing(df: pd.DataFrame) -> pd.DataFrame:
"""Impute missing values"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].mean())
return df
def convergence_check_data_quality(current: pd.DataFrame, previous: pd.DataFrame) -> Tuple[bool, float]:
"""Check if data quality has stabilized"""
if previous is None:
return False, float('inf')
# Compare row counts
size_diff = abs(len(current) - len(previous))
converged = size_diff < 10 # Less than 10 rows difference
return converged, size_diff
# Example 2: Iterative algorithm (Gradient Descent)
def gradient_descent_step(params: np.ndarray) -> np.ndarray:
"""Single gradient descent step"""
learning_rate = 0.01
# Example: Minimize f(x) = x^2 + y^2
gradient = 2 * params # Gradient of f
params = params - learning_rate * gradient
return params
def convergence_check_parameters(current: np.ndarray, previous: np.ndarray) -> Tuple[bool, float]:
"""Check parameter convergence"""
if previous is None:
return False, float('inf')
diff = np.linalg.norm(current - previous)
converged = diff < 1e-6
return converged, diff
# Usage example 1: Data cleaning loop
df = pd.read_csv('noisy_data.csv')
pipeline = LoopingPipeline(max_iterations=50)
pipeline.add_stage(remove_outliers)
pipeline.add_stage(impute_missing)
pipeline.set_convergence_check(convergence_check_data_quality)
cleaned_df, info = pipeline.execute(df)
print(f"Cleaned {len(df) - len(cleaned_df)} rows in {info['iterations']} iterations")
# Usage example 2: Optimization loop
initial_params = np.array([10.0, 10.0])
pipeline = LoopingPipeline(max_iterations=1000, tolerance=1e-6)
pipeline.add_stage(gradient_descent_step)
pipeline.set_convergence_check(convergence_check_parameters)
final_params, info = pipeline.execute(initial_params)
print(f"Optimized parameters: {final_params}")
Real-World Use Cases
- Machine Learning Training: Iterate through epochs until model converges
- Iterative Data Cleaning: Remove outliers -> Recalculate statistics -> Repeat until stable
- Graph Algorithms: PageRank, community detection with iterative refinement
- Optimization Problems: Gradient descent, simulated annealing
- ETL Reconciliation: Load data -> Validate -> Fix errors -> Reload until consistent
When to Use Looping Pattern
- Algorithms require multiple iterations
- Quality improves with repeated processing
- Convergence criteria can be defined
- Processing depends on previous iteration results
- Optimization or refinement needed
Anti-Pattern to Avoid
Infinite Loop Risk: Always implement these safeguards:
- Maximum iteration limit
- Convergence criteria
- Progress logging
- Timeout mechanism
- Circuit breaker for stuck iterations
Parallel Data Processing Pipeline Pattern
Overview
The parallel data processing pipeline pattern involves executing multiple instances of the same pipeline concurrently. This approach is beneficial for cases where multiple data sources need to be processed using the same pipeline or when the processing load needs to be distributed across multiple resources. Frameworks like Apache Spark enable parallel data processing at scale.
Advantages
- High parallelism and throughput
- Enables load distribution and resource optimization
Disadvantages
- Requires advanced coordination and management
- Complexity increases with the number of parallel instances
Code Example
Implementation using multiprocessing and Spark:
import pandas as pd
import numpy as np
from multiprocessing import Pool, cpu_count
from functools import partial
from typing import List, Callable, Any
class ParallelPipeline:
def __init__(self, num_workers: int = None):
self.num_workers = num_workers or cpu_count()
self.stages = []
def add_stage(self, func: Callable):
"""Add processing stage"""
self.stages.append(func)
return self
def process_single(self, data: Any) -> Any:
"""Process single data item through all stages"""
result = data
for stage in self.stages:
result = stage(result)
return result
def execute(self, data_list: List[Any]) -> List[Any]:
"""Execute pipeline on multiple data items in parallel"""
print(f"Processing {len(data_list)} items with {self.num_workers} workers")
with Pool(self.num_workers) as pool:
results = pool.map(self.process_single, data_list)
return results
# Example 1: Parallel file processing
def load_file(filepath: str) -> pd.DataFrame:
"""Load and parse file"""
return pd.read_csv(filepath)
def clean_data(df: pd.DataFrame) -> pd.DataFrame:
"""Clean data"""
df = df.dropna()
df = df.drop_duplicates()
return df
def aggregate_data(df: pd.DataFrame) -> pd.DataFrame:
"""Aggregate metrics"""
return df.groupby('category').agg({
'value': ['sum', 'mean', 'count']
})
def save_results(df: pd.DataFrame) -> str:
"""Save processed results"""
output_path = f"output/processed_{hash(str(df))}.csv"
df.to_csv(output_path)
return output_path
# Process multiple files in parallel
file_list = [f'data/file_{i}.csv' for i in range(100)]
pipeline = ParallelPipeline(num_workers=8)
pipeline.add_stage(load_file)
pipeline.add_stage(clean_data)
pipeline.add_stage(aggregate_data)
pipeline.add_stage(save_results)
results = pipeline.execute(file_list)
print(f"Processed {len(results)} files")
# Example 2: Using Apache Spark for massive parallelism
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count
def create_spark_parallel_pipeline():
"""Create Spark-based parallel pipeline"""
spark = SparkSession.builder \
.appName("Parallel Pipeline") \
.config("spark.executor.instances", "10") \
.config("spark.executor.cores", "4") \
.config("spark.executor.memory", "4g") \
.getOrCreate()
# Read multiple files in parallel
df = spark.read.csv("data/*.csv", header=True, inferSchema=True)
# Parallel transformations
df_cleaned = df.dropna() \
.dropDuplicates() \
.filter(col("value") > 0)
# Parallel aggregation
df_aggregated = df_cleaned.groupBy("category") \
.agg(
sum("value").alias("total"),
avg("value").alias("average"),
count("*").alias("count")
)
# Parallel write
df_aggregated.write.mode("overwrite") \
.partitionBy("category") \
.parquet("output/results")
spark.stop()
return "Processing complete"
# Example 3: Parallel data partitioning
def partition_and_process(df: pd.DataFrame, num_partitions: int = 8) -> pd.DataFrame:
"""Partition DataFrame and process in parallel"""
def process_partition(partition_df: pd.DataFrame) -> pd.DataFrame:
"""Process single partition"""
# Apply expensive transformations
partition_df['computed_value'] = partition_df['value'].apply(
lambda x: expensive_computation(x)
)
return partition_df
# Split into partitions
partitions = np.array_split(df, num_partitions)
# Process partitions in parallel
with Pool(num_partitions) as pool:
processed_partitions = pool.map(process_partition, partitions)
# Combine results
return pd.concat(processed_partitions, ignore_index=True)
def expensive_computation(x: float) -> float:
"""Simulate expensive computation"""
return np.sqrt(x ** 2 + np.sin(x) ** 2)
# Usage
large_df = pd.DataFrame({'value': np.random.randn(1000000)})
result = partition_and_process(large_df, num_partitions=8)
Architecture Pattern
Input Data (Multiple Sources/Partitions)
|
+----> Pipeline Instance 1 (Worker 1)
|
+----> Pipeline Instance 2 (Worker 2)
|
+----> Pipeline Instance 3 (Worker 3)
|
+----> Pipeline Instance N (Worker N)
|
+----> Merge/Collect Results
|
Output Data
Real-World Use Cases
- Batch Processing: Process thousands of files simultaneously
- Distributed ETL: Extract from multiple databases in parallel using Apache Spark
- Web Scraping: Scrape multiple websites concurrently
- Image Processing: Process image library across multiple cores
- Log Analysis: Analyze log files from multiple servers in parallel
- Time Series Processing: Process different time periods independently
When to Use Parallel Pattern
- Large volumes of independent data items
- Each item can be processed independently
- Processing time per item is significant
- Hardware has multiple cores/machines available
- No dependencies between parallel executions
- Results can be merged or aggregated
Scalability Considerations
Vertical Scaling (Single Machine):
- Use multiprocessing for CPU-bound tasks
- Use threading for I/O-bound tasks
- Optimal workers = CPU cores for CPU-bound
- Optimal workers = 2-4x CPU cores for I/O-bound
Horizontal Scaling (Multiple Machines):
- Use distributed frameworks (Spark, Dask, Ray)
- Implement proper data partitioning strategy
- Consider network overhead
- Handle fault tolerance and retries
Hybrid Data Processing Pipeline Pattern
Overview
The hybrid data processing pipeline pattern combines multiple pipeline patterns to create a more complex and flexible solution. This approach is useful when dealing with data processing tasks that have varying requirements and need to be executed in parallel or iteratively.
Advantages
- Offers a highly customizable solution
- Can handle complex data processing scenarios
Disadvantages
- Increased complexity and maintenance
- Requires a deep understanding of the underlying patterns
Code Example
Complex implementation combining multiple patterns:
import pandas as pd
import numpy as np
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from typing import List, Dict, Any, Callable
from dataclasses import dataclass
from enum import Enum
class PipelineType(Enum):
LINEAR = "linear"
PARALLEL = "parallel"
BRANCHING = "branching"
LOOPING = "looping"
@dataclass
class PipelineStage:
name: str
func: Callable
pipeline_type: PipelineType
config: Dict = None
class HybridPipeline:
def __init__(self):
self.stages = []
def add_linear_stage(self, name: str, func: Callable):
"""Add sequential processing stage"""
self.stages.append(PipelineStage(name, func, PipelineType.LINEAR))
return self
def add_parallel_stage(self, name: str, func: Callable, num_workers: int = 4):
"""Add parallel processing stage"""
config = {'num_workers': num_workers}
self.stages.append(PipelineStage(name, func, PipelineType.PARALLEL, config))
return self
def add_branching_stage(self, name: str, branches: Dict[str, Callable]):
"""Add branching stage with multiple paths"""
config = {'branches': branches}
self.stages.append(PipelineStage(name, None, PipelineType.BRANCHING, config))
return self
def add_looping_stage(self, name: str, func: Callable, max_iterations: int = 10):
"""Add iterative processing stage"""
config = {'max_iterations': max_iterations}
self.stages.append(PipelineStage(name, func, PipelineType.LOOPING, config))
return self
def execute(self, data: Any) -> Any:
"""Execute hybrid pipeline"""
result = data
for stage in self.stages:
print(f"\nExecuting {stage.pipeline_type.value} stage: {stage.name}")
if stage.pipeline_type == PipelineType.LINEAR:
result = self._execute_linear(stage, result)
elif stage.pipeline_type == PipelineType.PARALLEL:
result = self._execute_parallel(stage, result)
elif stage.pipeline_type == PipelineType.BRANCHING:
result = self._execute_branching(stage, result)
elif stage.pipeline_type == PipelineType.LOOPING:
result = self._execute_looping(stage, result)
return result
def _execute_linear(self, stage: PipelineStage, data: Any) -> Any:
"""Execute linear stage"""
return stage.func(data)
def _execute_parallel(self, stage: PipelineStage, data: List[Any]) -> List[Any]:
"""Execute parallel stage"""
num_workers = stage.config['num_workers']
with ProcessPoolExecutor(max_workers=num_workers) as executor:
results = list(executor.map(stage.func, data))
return results
def _execute_branching(self, stage: PipelineStage, data: Any) -> Dict[str, Any]:
"""Execute branching stage"""
branches = stage.config['branches']
results = {}
with ThreadPoolExecutor(max_workers=len(branches)) as executor:
future_to_branch = {
executor.submit(func, data): name
for name, func in branches.items()
}
for future in future_to_branch:
branch_name = future_to_branch[future]
results[branch_name] = future.result()
return results
def _execute_looping(self, stage: PipelineStage, data: Any) -> Any:
"""Execute looping stage"""
max_iterations = stage.config['max_iterations']
result = data
for i in range(max_iterations):
previous = result
result = stage.func(result)
# Check convergence
if self._has_converged(result, previous):
print(f"Converged after {i+1} iterations")
break
return result
def _has_converged(self, current: Any, previous: Any, tolerance: float = 1e-6) -> bool:
"""Check if processing has converged"""
if isinstance(current, (int, float)) and isinstance(previous, (int, float)):
return abs(current - previous) < tolerance
return False
# Complex real-world example: E-commerce data pipeline
def load_raw_data(source: str) -> pd.DataFrame:
"""Linear: Load data"""
print("Loading raw data...")
return pd.read_csv(source)
def clean_partition(df_partition: pd.DataFrame) -> pd.DataFrame:
"""Parallel: Clean data partition"""
df_partition = df_partition.dropna()
df_partition = df_partition.drop_duplicates()
df_partition['processed_at'] = pd.Timestamp.now()
return df_partition
def analyze_sales(df: pd.DataFrame) -> Dict:
"""Branch: Sales analysis"""
return {
'total_revenue': df['amount'].sum(),
'avg_order_value': df['amount'].mean(),
'orders': len(df)
}
def analyze_customers(df: pd.DataFrame) -> Dict:
"""Branch: Customer analysis"""
return {
'unique_customers': df['customer_id'].nunique(),
'repeat_rate': (df['customer_id'].value_counts() > 1).sum() / df['customer_id'].nunique()
}
def analyze_products(df: pd.DataFrame) -> Dict:
"""Branch: Product analysis"""
top_products = df.groupby('product_id')['amount'].sum().nlargest(10)
return {
'top_products': top_products.to_dict(),
'products_sold': df['product_id'].nunique()
}
def optimize_recommendations(recommendations: Dict) -> Dict:
"""Loop: Iteratively improve recommendations"""
# Simulate optimization
score = recommendations.get('score', 0.5)
recommendations['score'] = min(score + 0.1, 1.0)
return recommendations
def generate_report(results: Dict) -> str:
"""Linear: Final report generation"""
report = "=== Data Processing Report ===\n"
for key, value in results.items():
report += f"{key}: {value}\n"
return report
# Build and execute hybrid pipeline
pipeline = HybridPipeline()
# Stage 1: Linear - Load data
pipeline.add_linear_stage("Load Data", lambda x: load_raw_data(x))
# Stage 2: Parallel - Clean data partitions
pipeline.add_parallel_stage("Clean Data", clean_partition, num_workers=4)
# Stage 3: Combine partitions
pipeline.add_linear_stage("Combine", lambda partitions: pd.concat(partitions))
# Stage 4: Branching - Multiple analyses
pipeline.add_branching_stage("Analysis", {
'sales': analyze_sales,
'customers': analyze_customers,
'products': analyze_products
})
# Stage 5: Loop - Optimize recommendations
pipeline.add_looping_stage("Optimize", optimize_recommendations, max_iterations=5)
# Stage 6: Linear - Generate report
pipeline.add_linear_stage("Report", generate_report)
# Execute pipeline
result = pipeline.execute('ecommerce_data.csv')
print("\nFinal Result:")
print(result)
Architecture Diagram Description
Input Data
|
v
[Linear: Extract] --------> Load raw data
|
v
[Parallel: Transform] ----> +---> Worker 1: Clean partition 1
+---> Worker 2: Clean partition 2
+---> Worker 3: Clean partition 3
+---> Worker N: Clean partition N
|
v
[Linear: Merge] ----------> Combine all partitions
|
v
[Branching: Analyze] -----> +---> Branch A: Sales analysis
+---> Branch B: Customer analysis
+---> Branch C: Product analysis
|
v
[Linear: Aggregate] -------> Merge branch results
|
v
[Looping: Optimize] -------> Iterate until converged
| (improve model/recommendations)
v
[Linear: Load] ------------> Save to destination
|
v
Output Data
Real-World Use Cases
ML Training Pipeline:
- Linear: Load dataset
- Parallel: Feature engineering on data chunks
- Branching: Train multiple model types (RF, XGBoost, Neural Net)
- Looping: Hyperparameter optimization
- Linear: Select best model and deploy
Real-time Analytics Platform:
- Linear: Ingest streaming data
- Branching: Route to [real-time dashboard, batch storage, alerting]
- Parallel: Process multiple streams
- Looping: Adaptive threshold calculation
- Linear: Aggregate and store
Content Processing:
- Linear: Upload media file
- Branching: Extract [metadata, thumbnails, transcripts]
- Parallel: Generate multiple quality versions
- Linear: Quality validation
- Looping: Retry failed conversions
- Linear: Publish to CDN
Financial Data Pipeline:
- Linear: Fetch market data
- Parallel: Process multiple exchanges
- Branching: Calculate [technical indicators, risk metrics, predictions]
- Looping: Backtesting with parameter optimization
- Linear: Generate trading signals
When to Use Hybrid Pattern
- Complex workflows with varied requirements
- Different stages benefit from different patterns
- Need flexibility to optimize each component
- Processing requirements change based on data characteristics
- System must handle multiple data types/sources
Combining Patterns: Decision Matrix
Stage Characteristics | Recommended Pattern |
---|---|
Sequential dependencies | Linear |
Independent data items | Parallel |
Multiple outputs needed | Branching |
Iterative refinement | Looping |
Mixed requirements | Hybrid |
Monitoring and Error Handling Strategies
Robust pipelines require comprehensive monitoring and error handling:
Logging Strategy
import logging
from datetime import datetime
import json
class PipelineLogger:
def __init__(self, pipeline_name: str):
self.pipeline_name = pipeline_name
self.logger = logging.getLogger(pipeline_name)
self.logger.setLevel(logging.INFO)
# File handler
fh = logging.FileHandler(f'logs/{pipeline_name}_{datetime.now():%Y%m%d}.log')
fh.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
self.logger.addHandler(fh)
def log_stage_start(self, stage_name: str, input_size: int = None):
self.logger.info(f"Starting stage: {stage_name}, Input size: {input_size}")
def log_stage_end(self, stage_name: str, output_size: int = None, duration: float = None):
self.logger.info(
f"Completed stage: {stage_name}, Output size: {output_size}, Duration: {duration:.2f}s"
)
def log_error(self, stage_name: str, error: Exception):
self.logger.error(f"Error in stage {stage_name}: {str(error)}", exc_info=True)
def log_metrics(self, stage_name: str, metrics: Dict):
self.logger.info(f"Metrics for {stage_name}: {json.dumps(metrics)}")
Error Handling Framework
from typing import Optional, Callable
from enum import Enum
class ErrorStrategy(Enum):
FAIL_FAST = "fail_fast" # Stop on first error
SKIP = "skip" # Skip failed items
RETRY = "retry" # Retry with backoff
FALLBACK = "fallback" # Use fallback function
class RobustPipeline:
def __init__(self, error_strategy: ErrorStrategy = ErrorStrategy.FAIL_FAST):
self.error_strategy = error_strategy
self.stages = []
self.logger = PipelineLogger("RobustPipeline")
def add_stage(
self,
name: str,
func: Callable,
retry_count: int = 3,
fallback_func: Optional[Callable] = None
):
self.stages.append({
'name': name,
'func': func,
'retry_count': retry_count,
'fallback_func': fallback_func
})
return self
def execute(self, data: Any) -> Any:
result = data
for stage in self.stages:
try:
result = self._execute_stage_with_retry(stage, result)
except Exception as e:
result = self._handle_error(stage, result, e)
return result
def _execute_stage_with_retry(self, stage: Dict, data: Any) -> Any:
"""Execute stage with retry logic"""
import time
for attempt in range(stage['retry_count']):
try:
self.logger.log_stage_start(stage['name'])
start_time = time.time()
result = stage['func'](data)
duration = time.time() - start_time
self.logger.log_stage_end(stage['name'], duration=duration)
return result
except Exception as e:
if attempt < stage['retry_count'] - 1:
wait_time = 2 ** attempt # Exponential backoff
self.logger.logger.warning(
f"Attempt {attempt + 1} failed for {stage['name']}, "
f"retrying in {wait_time}s..."
)
time.sleep(wait_time)
else:
raise e
def _handle_error(self, stage: Dict, data: Any, error: Exception) -> Any:
"""Handle errors based on strategy"""
self.logger.log_error(stage['name'], error)
if self.error_strategy == ErrorStrategy.FAIL_FAST:
raise error
elif self.error_strategy == ErrorStrategy.SKIP:
self.logger.logger.warning(f"Skipping stage {stage['name']}")
return data
elif self.error_strategy == ErrorStrategy.FALLBACK:
if stage['fallback_func']:
self.logger.logger.info(f"Using fallback for {stage['name']}")
return stage['fallback_func'](data)
else:
raise error
return data
# Example usage
def risky_operation(data: pd.DataFrame) -> pd.DataFrame:
"""Operation that might fail"""
if len(data) == 0:
raise ValueError("Empty DataFrame")
return data.apply(lambda x: x * 2)
def safe_fallback(data: pd.DataFrame) -> pd.DataFrame:
"""Fallback if main operation fails"""
return data.fillna(0)
pipeline = RobustPipeline(error_strategy=ErrorStrategy.FALLBACK)
pipeline.add_stage("Risky Transform", risky_operation, retry_count=3, fallback_func=safe_fallback)
Monitoring Metrics
import time
from dataclasses import dataclass, field
from typing import List
@dataclass
class StageMetrics:
stage_name: str
start_time: float
end_time: float = 0
input_size: int = 0
output_size: int = 0
errors: List[str] = field(default_factory=list)
@property
def duration(self) -> float:
return self.end_time - self.start_time if self.end_time else 0
@property
def throughput(self) -> float:
return self.input_size / self.duration if self.duration > 0 else 0
class MonitoredPipeline:
def __init__(self):
self.stages = []
self.metrics = []
def execute_with_monitoring(self, data: Any) -> tuple[Any, List[StageMetrics]]:
"""Execute and collect detailed metrics"""
result = data
for stage_name, stage_func in self.stages:
metrics = StageMetrics(
stage_name=stage_name,
start_time=time.time(),
input_size=self._get_size(result)
)
try:
result = stage_func(result)
metrics.output_size = self._get_size(result)
except Exception as e:
metrics.errors.append(str(e))
raise
finally:
metrics.end_time = time.time()
self.metrics.append(metrics)
return result, self.metrics
def _get_size(self, data: Any) -> int:
"""Get data size"""
if isinstance(data, pd.DataFrame):
return len(data)
elif isinstance(data, (list, tuple)):
return len(data)
return 1
def print_performance_report(self):
"""Print detailed performance metrics"""
print("\n=== Pipeline Performance Report ===")
total_duration = sum(m.duration for m in self.metrics)
for metrics in self.metrics:
print(f"\nStage: {metrics.stage_name}")
print(f" Duration: {metrics.duration:.3f}s ({metrics.duration/total_duration*100:.1f}%)")
print(f" Input size: {metrics.input_size}")
print(f" Output size: {metrics.output_size}")
print(f" Throughput: {metrics.throughput:.2f} items/sec")
if metrics.errors:
print(f" Errors: {len(metrics.errors)}")
print(f"\nTotal Duration: {total_duration:.3f}s")
Testing Pipeline Patterns
Comprehensive testing ensures pipeline reliability:
import unittest
import pandas as pd
from unittest.mock import Mock, patch
class TestLinearPipeline(unittest.TestCase):
def setUp(self):
self.pipeline = LinearPipeline()
self.sample_data = pd.DataFrame({'value': [1, 2, 3]})
def test_empty_pipeline(self):
"""Test pipeline with no stages"""
result = self.pipeline.execute(self.sample_data)
pd.testing.assert_frame_equal(result, self.sample_data)
def test_single_stage(self):
"""Test pipeline with one stage"""
self.pipeline.add_stage("double", lambda df: df * 2)
result = self.pipeline.execute(self.sample_data)
expected = pd.DataFrame({'value': [2, 4, 6]})
pd.testing.assert_frame_equal(result, expected)
def test_multiple_stages(self):
"""Test pipeline with multiple stages"""
self.pipeline.add_stage("double", lambda df: df * 2)
self.pipeline.add_stage("add_one", lambda df: df + 1)
result = self.pipeline.execute(self.sample_data)
expected = pd.DataFrame({'value': [3, 5, 7]})
pd.testing.assert_frame_equal(result, expected)
def test_stage_error_handling(self):
"""Test error propagation"""
def failing_stage(df):
raise ValueError("Test error")
self.pipeline.add_stage("failing", failing_stage)
with self.assertRaises(ValueError):
self.pipeline.execute(self.sample_data)
class TestParallelPipeline(unittest.TestCase):
def setUp(self):
self.pipeline = ParallelPipeline(num_workers=2)
def test_parallel_execution(self):
"""Test parallel processing of multiple items"""
def process(x):
return x * 2
self.pipeline.add_stage(process)
result = self.pipeline.execute([1, 2, 3, 4])
self.assertEqual(result, [2, 4, 6, 8])
@patch('multiprocessing.Pool')
def test_worker_count(self, mock_pool):
"""Test correct number of workers"""
pipeline = ParallelPipeline(num_workers=4)
# Verify pool created with correct workers
# ... implementation details
# Integration test
class TestHybridPipeline(unittest.TestCase):
def test_end_to_end(self):
"""Test complete pipeline execution"""
pipeline = HybridPipeline()
# Add various stages
pipeline.add_linear_stage("load", lambda x: pd.read_csv(x))
pipeline.add_parallel_stage("process", lambda df: df.dropna(), num_workers=2)
# Execute and verify
result = pipeline.execute('test_data.csv')
self.assertIsNotNone(result)
if __name__ == '__main__':
unittest.main()
Anti-Patterns to Avoid
Common mistakes when implementing data pipelines:
1. The God Pipeline
Problem: Single massive pipeline doing everything
# BAD: One pipeline with 50 stages
pipeline.add_stage("load").add_stage("clean").add_stage("transform1") # ... 47 more
Solution: Break into logical sub-pipelines
# GOOD: Modular pipelines
ingestion_pipeline = create_ingestion_pipeline()
transformation_pipeline = create_transformation_pipeline()
validation_pipeline = create_validation_pipeline()
2. Silent Failures
Problem: Errors swallowed without logging
# BAD
try:
process_data(data)
except:
pass # Error lost!
Solution: Always log and handle errors appropriately
# GOOD
try:
process_data(data)
except Exception as e:
logger.error(f"Processing failed: {e}")
raise
3. No Idempotency
Problem: Re-running pipeline produces different results
# BAD: Using timestamps
df['processed_at'] = datetime.now()
Solution: Make operations idempotent
# GOOD: Deterministic processing
df['processed_at'] = pd.to_datetime(df['event_date']).dt.date
4. Memory Leaks
Problem: Not releasing resources
# BAD
large_datasets = []
for file in files:
large_datasets.append(pd.read_csv(file)) # Memory grows
Solution: Process in chunks or clean up
# GOOD
for file in files:
df = pd.read_csv(file)
process(df)
del df # Explicit cleanup
5. Tight Coupling
Problem: Stages directly depend on each other
# BAD: Stage B knows about Stage A internals
def stage_b(data):
# Assumes exact structure from stage_a
return data['stage_a_specific_column']
Solution: Use contracts/interfaces
# GOOD: Well-defined interfaces
def stage_b(data: pd.DataFrame) -> pd.DataFrame:
# Validates expected columns exist
required_columns = ['id', 'value']
assert all(col in data.columns for col in required_columns)
return data[required_columns]
Conclusion
Data processing pipeline patterns play a crucial role in designing and implementing efficient data processing solutions. Understanding the strengths and limitations of each pattern is key to selecting the best approach for a given scenario. By leveraging these patterns, data engineers can create highly efficient, scalable, and robust data processing pipelines that meet the ever-growing demands of the modern data landscape.
Pattern Selection Guide
Use this decision tree to choose the right pattern:
Is data processing sequential with clear dependencies?
- Yes -> Linear Pipeline
- No -> Continue
Do you need multiple independent analyses of the same data?
- Yes -> Branching Pipeline
- No -> Continue
Is the processing iterative or requires convergence?
- Yes -> Looping Pipeline
- No -> Continue
Can data be partitioned and processed independently?
- Yes -> Parallel Pipeline
- No -> Continue
Do you need combination of multiple patterns?
- Yes -> Hybrid Pipeline
Remember: Start simple and evolve. Begin with linear patterns and introduce complexity only when requirements demand it.
Related Topics
- Apache Airflow - Implement these patterns with workflow orchestration
- Apache Spark - Parallel processing framework
- Top Data Engineering Tools - Tools for building pipelines
- Pandas - Data transformation library
Table of Contents
- Linear Data Processing Pipeline Pattern
- Branching Data Processing Pipeline Pattern
- Looping Data Processing Pipeline Pattern
- Parallel Data Processing Pipeline Pattern
- Hybrid Data Processing Pipeline Pattern
- Monitoring and Error Handling Strategies
- Testing Pipeline Patterns
- Anti-Patterns to Avoid
- Conclusion
- Related Topics
Related Articles
Jupyter Notebooks, Beyond the Basics
Jupyter Notebooks, Beyond the Basics
dbt (Data Build Tool): Complete Guide to Modern Data Transformation
Master dbt (Data Build Tool), the modern framework for transforming data in your warehouse. Learn dbt Core and Cloud, models, tests, documentation, deployment patterns, and best practices for building production-grade analytics workflows.
Apache Kafka: Complete Guide to Distributed Event Streaming
Master Apache Kafka, the distributed event streaming platform powering real-time data pipelines at scale. Learn Kafka architecture, producers, consumers, Kafka Streams, Kafka Connect, and best practices for building production event-driven systems.