Chained Processing

Chained processing lets you transform data through sequential route operations.

Pattern Overview

This pattern:

  • Sequential Operations: Chain multiple transformations
  • Data Validation: Filter and validate data at each step
  • Conditional Logic: Route data based on conditions
  • Pipeline Composition: Build reusable components

Key Components

Routes

  • map(): Transform data through functions
  • filter(): Conditionally continue processing
  • stream(): Process generators that yield multiple values
  • Pipeline Chaining: Link operations for complex workflows

Processing Functions

  • Pure Functions: Transform data without side effects
  • Async Support: Handle I/O operations in the pipeline
  • Error Handling: Graceful failure handling at each step
  • Type Safety: Maintain type consistency through transformations

Basic Example

1# Multi-step form processing pipeline
2(
3 bridge.on(FormSubmitted)
4 .map(parse_form_data)
5 .filter(lambda form: form.is_complete)
6 .map(validate_fields)
7 .filter(lambda form: form.is_valid)
8 .map(calculate_totals)
9 .map(generate_confirmation)
10 .broadcast(FormProcessed)
11)
12
13# Helper functions for each step
14def parse_form_data(message):
15 raw_data = message.event.form_data
16 return FormData(
17 name=raw_data.get('name'),
18 email=raw_data.get('email'),
19 items=raw_data.get('items', [])
20 )
21
22def validate_fields(form):
23 form.errors = []
24
25 if not form.name:
26 form.errors.append("Name is required")
27 if not is_valid_email(form.email):
28 form.errors.append("Invalid email format")
29
30 form.is_valid = len(form.errors) == 0
31 return form
32
33def calculate_totals(form):
34 form.subtotal = sum(item.price for item in form.items)
35 form.tax = form.subtotal * 0.08
36 form.total = form.subtotal + form.tax
37 return form

Advanced Chaining Patterns

Conditional Routing

1# Route based on computed properties
2def classify_priority(request):
3 if request.amount > 10000:
4 request.priority = "high"
5 elif request.urgency == "critical":
6 request.priority = "high"
7 else:
8 request.priority = "normal"
9 return request
10
11(
12 bridge.on(CustomerRequest)
13 .map(enrich_customer_data)
14 .map(classify_priority)
15 .filter(lambda req: req.priority == "high")
16 .map(escalate_to_manager)
17 .broadcast(HighPriorityRequest)
18)
19
20# Parallel processing for different conditions
21(
22 bridge.on(CustomerRequest)
23 .map(classify_priority)
24 .filter(lambda req: req.priority == "normal")
25 .map(handle_standard_request)
26 .broadcast(StandardRequest)
27)

Multi-Stage Validation

1async def validate_customer_info(data):
2 # Database lookup
3 customer = await db.get_customer(data.customer_id)
4 if not customer:
5 data.errors.append("Customer not found")
6 return data
7
8 # Credit check
9 credit_score = await credit_api.get_score(customer.ssn)
10 data.credit_approved = credit_score > 650
11
12 return data
13
14async def validate_inventory(data):
15 # Check product availability
16 for item in data.items:
17 available = await inventory.check_stock(item.product_id)
18 if available < item.quantity:
19 data.errors.append(f"Insufficient stock for {item.name}")
20
21 return data
22
23# Chain async validations
24(
25 bridge.on(OrderRequest)
26 .map(validate_customer_info)
27 .filter(lambda order: len(order.errors) == 0)
28 .map(validate_inventory)
29 .filter(lambda order: len(order.errors) == 0)
30 .map(process_payment)
31 .broadcast(OrderConfirmed)
32)

Stream Processing Chains

1# Process collections with chaining
2async def analyze_conversation_chunks(context):
3 """Generator that yields analysis for each sentence."""
4 transcript = context.get_full_transcript()
5 sentences = split_into_sentences(transcript)
6
7 for sentence in sentences:
8 sentiment = await analyze_sentiment(sentence)
9 entities = extract_entities(sentence)
10
11 yield SentenceAnalysis(
12 text=sentence,
13 sentiment=sentiment,
14 entities=entities
15 )
16
17def aggregate_insights(analysis):
18 """Combine sentence analyses into conversation insights."""
19 return ConversationInsight(
20 overall_sentiment=calculate_overall_sentiment(analysis.sentiment),
21 key_entities=analysis.entities,
22 conversation_type=classify_conversation(analysis)
23 )
24
25# Chain streaming with aggregation
26(
27 bridge.on(UserStoppedSpeaking)
28 .stream(analyze_conversation_chunks) # Yields multiple analyses
29 .map(aggregate_insights) # Each analysis aggregated
30 .filter(lambda insight: insight.confidence > 0.8)
31 .broadcast(ConversationInsight)
32)

Error Handling in Chains

1def safe_transform(transform_func):
2 """Wrapper to handle errors in pipeline steps."""
3 def wrapper(data):
4 try:
5 return transform_func(data)
6 except Exception as e:
7 logger.error(f"Transform error: {e}")
8 data.errors = getattr(data, 'errors', [])
9 data.errors.append(f"Processing error: {str(e)}")
10 return data
11 return wrapper
12
13# Apply error handling to pipeline
14(
15 bridge.on(DataSubmission)
16 .map(safe_transform(parse_input))
17 .filter(lambda d: not getattr(d, 'errors', []))
18 .map(safe_transform(validate_business_rules))
19 .filter(lambda d: not getattr(d, 'errors', []))
20 .map(safe_transform(enrich_with_external_data))
21 .broadcast(ProcessedData)
22)

Early Exit Pattern

1def should_stop_processing(data):
2 """Determine if processing should stop early."""
3 return (
4 data.status == "completed" or
5 data.error is not None or
6 data.user_cancelled
7 )
8
9(
10 bridge.on(ProcessingUpdate)
11 .map(check_prerequisites)
12 .exit(should_stop_processing) # Stop here if condition met
13 .map(expensive_computation) # Only runs if exit condition false
14 .map(finalize_results)
15 .broadcast(ProcessingComplete)
16)

Reusable Pipeline Components

1class PipelineBuilder:
2 """Helper for building reusable pipeline components."""
3
4 @staticmethod
5 def validation_pipeline(validators):
6 """Create a validation pipeline from a list of validators."""
7 def build_route(bridge_route):
8 for validator in validators:
9 bridge_route = bridge_route.map(validator).filter(lambda d: d.is_valid)
10 return bridge_route
11 return build_route
12
13 @staticmethod
14 def enrichment_pipeline(enrichers):
15 """Create an enrichment pipeline from a list of enricher functions."""
16 def build_route(bridge_route):
17 for enricher in enrichers:
18 bridge_route = bridge_route.map(enricher)
19 return bridge_route
20 return build_route
21
22# Use reusable components
23customer_validation = PipelineBuilder.validation_pipeline([
24 validate_customer_id,
25 validate_contact_info,
26 validate_credit_status
27])
28
29data_enrichment = PipelineBuilder.enrichment_pipeline([
30 add_customer_history,
31 add_preferences,
32 add_risk_score
33])
34
35# Compose pipelines
36route = bridge.on(CustomerOnboarding)
37route = customer_validation(route)
38route = data_enrichment(route)
39route.broadcast(CustomerReady)

Best Practices

  1. Pure Functions: Use pure functions for transformations when possible
  2. Error Boundaries: Handle errors at appropriate pipeline stages
  3. Early Exit: Stop processing when conditions make continuation unnecessary
  4. Type Safety: Maintain consistent data types through the pipeline
  5. Logging: Log important transformations for debugging
  6. Performance: Consider async operations for I/O-bound steps
  7. Modularity: Create reusable pipeline components

Common Use Cases

  • Form Processing: Multi-step validation and transformation
  • Data Enrichment: Add context from multiple sources
  • Business Rule Engine: Apply sequential business logic
  • Content Analysis: Multi-stage text/voice analysis
  • Order Processing: Complex e-commerce workflows
  • Data Migration: Transform data through multiple stages

This pattern enables complex data processing through route composition.