Chained Processing

Chained processing lets you transform data through sequential route operations.

Pattern Overview

This pattern:

  • Sequential Operations: Chain multiple transformations
  • Data Validation: Filter and validate data at each step
  • Conditional Logic: Route data based on conditions
  • Pipeline Composition: Build reusable components

Key Components

Routes

  • map(): Transform data through functions
  • filter(): Conditionally continue processing
  • stream(): Process generators that yield multiple values
  • Pipeline Chaining: Link operations for complex workflows

Processing Functions

  • Pure Functions: Transform data without side effects
  • Async Support: Handle I/O operations in the pipeline
  • Error Handling: Graceful failure handling at each step
  • Type Safety: Maintain type consistency through transformations

Basic Example

1# Multi-step form processing pipeline
2(
3 bridge.on(FormSubmitted)
4 .map(parse_form_data)
5 .filter(lambda form: form.is_complete)
6 .map(validate_fields)
7 .filter(lambda form: form.is_valid)
8 .map(calculate_totals)
9 .map(generate_confirmation)
10 .broadcast(FormProcessed)
11)
12
13
14# Helper functions for each step
15def parse_form_data(message):
16 raw_data = message.event.form_data
17 return FormData(
18 name=raw_data.get("name"),
19 email=raw_data.get("email"),
20 items=raw_data.get("items", []),
21 )
22
23
24def validate_fields(form):
25 form.errors = []
26
27 if not form.name:
28 form.errors.append("Name is required")
29 if not is_valid_email(form.email):
30 form.errors.append("Invalid email format")
31
32 form.is_valid = len(form.errors) == 0
33 return form
34
35
36def calculate_totals(form):
37 form.subtotal = sum(item.price for item in form.items)
38 form.tax = form.subtotal * 0.08
39 form.total = form.subtotal + form.tax
40 return form

Advanced Chaining Patterns

Conditional Routing

1# Route based on computed properties
2def classify_priority(request):
3 if request.amount > 10000:
4 request.priority = "high"
5 elif request.urgency == "critical":
6 request.priority = "high"
7 else:
8 request.priority = "normal"
9 return request
10
11
12(
13 bridge.on(CustomerRequest)
14 .map(enrich_customer_data)
15 .map(classify_priority)
16 .filter(lambda req: req.priority == "high")
17 .map(escalate_to_manager)
18 .broadcast(HighPriorityRequest)
19)
20
21# Parallel processing for different conditions
22(
23 bridge.on(CustomerRequest)
24 .map(classify_priority)
25 .filter(lambda req: req.priority == "normal")
26 .map(handle_standard_request)
27 .broadcast(StandardRequest)
28)

Multi-Stage Validation

1async def validate_customer_info(data):
2 # Database lookup
3 customer = await db.get_customer(data.customer_id)
4 if not customer:
5 data.errors.append("Customer not found")
6 return data
7
8 # Credit check
9 credit_score = await credit_api.get_score(customer.ssn)
10 data.credit_approved = credit_score > 650
11
12 return data
13
14
15async def validate_inventory(data):
16 # Check product availability
17 for item in data.items:
18 available = await inventory.check_stock(item.product_id)
19 if available < item.quantity:
20 data.errors.append(f"Insufficient stock for {item.name}")
21
22 return data
23
24
25# Chain async validations
26(
27 bridge.on(OrderRequest)
28 .map(validate_customer_info)
29 .filter(lambda order: len(order.errors) == 0)
30 .map(validate_inventory)
31 .filter(lambda order: len(order.errors) == 0)
32 .map(process_payment)
33 .broadcast(OrderConfirmed)
34)

Stream Processing Chains

1# Process collections with chaining
2async def analyze_conversation_chunks(context):
3 """Generator that yields analysis for each sentence."""
4 transcript = context.get_full_transcript()
5 sentences = split_into_sentences(transcript)
6
7 for sentence in sentences:
8 sentiment = await analyze_sentiment(sentence)
9 entities = extract_entities(sentence)
10
11 yield SentenceAnalysis(
12 text=sentence, sentiment=sentiment, entities=entities
13 )
14
15
16def aggregate_insights(analysis):
17 """Combine sentence analyses into conversation insights."""
18 return ConversationInsight(
19 overall_sentiment=calculate_overall_sentiment(
20 analysis.sentiment
21 ),
22 key_entities=analysis.entities,
23 conversation_type=classify_conversation(analysis),
24 )
25
26
27# Chain streaming with aggregation
28(
29 bridge.on(UserStoppedSpeaking)
30 .stream(analyze_conversation_chunks) # Yields multiple analyses
31 .map(aggregate_insights) # Each analysis aggregated
32 .filter(lambda insight: insight.confidence > 0.8)
33 .broadcast(ConversationInsight)
34)

Error Handling in Chains

1def safe_transform(transform_func):
2 """Wrapper to handle errors in pipeline steps."""
3
4 def wrapper(data):
5 try:
6 return transform_func(data)
7 except Exception as e:
8 logger.error(f"Transform error: {e}")
9 data.errors = getattr(data, "errors", [])
10 data.errors.append(f"Processing error: {str(e)}")
11 return data
12
13 return wrapper
14
15
16# Apply error handling to pipeline
17(
18 bridge.on(DataSubmission)
19 .map(safe_transform(parse_input))
20 .filter(lambda d: not getattr(d, "errors", []))
21 .map(safe_transform(validate_business_rules))
22 .filter(lambda d: not getattr(d, "errors", []))
23 .map(safe_transform(enrich_with_external_data))
24 .broadcast(ProcessedData)
25)

Early Exit Pattern

1def should_stop_processing(data):
2 """Determine if processing should stop early."""
3 return (
4 data.status == "completed"
5 or data.error is not None
6 or data.user_cancelled
7 )
8
9
10(
11 bridge.on(ProcessingUpdate)
12 .map(check_prerequisites)
13 .exit(should_stop_processing) # Stop here if condition met
14 .map(expensive_computation) # Only runs if exit condition false
15 .map(finalize_results)
16 .broadcast(ProcessingComplete)
17)

Reusable Pipeline Components

1class PipelineBuilder:
2 """Helper for building reusable pipeline components."""
3
4 @staticmethod
5 def validation_pipeline(validators):
6 """Create a validation pipeline from a list of validators."""
7
8 def build_route(bridge_route):
9 for validator in validators:
10 bridge_route = bridge_route.map(validator).filter(
11 lambda d: d.is_valid
12 )
13 return bridge_route
14
15 return build_route
16
17 @staticmethod
18 def enrichment_pipeline(enrichers):
19 """Create an enrichment pipeline from a list of enricher functions."""
20
21 def build_route(bridge_route):
22 for enricher in enrichers:
23 bridge_route = bridge_route.map(enricher)
24 return bridge_route
25
26 return build_route
27
28
29# Use reusable components
30customer_validation = PipelineBuilder.validation_pipeline(
31 [
32 validate_customer_id,
33 validate_contact_info,
34 validate_credit_status,
35 ]
36)
37
38data_enrichment = PipelineBuilder.enrichment_pipeline(
39 [add_customer_history, add_preferences, add_risk_score]
40)
41
42# Compose pipelines
43route = bridge.on(CustomerOnboarding)
44route = customer_validation(route)
45route = data_enrichment(route)
46route.broadcast(CustomerReady)

Best Practices

  1. Pure Functions: Use pure functions for transformations when possible
  2. Error Boundaries: Handle errors at appropriate pipeline stages
  3. Early Exit: Stop processing when conditions make continuation unnecessary
  4. Type Safety: Maintain consistent data types through the pipeline
  5. Logging: Log important transformations for debugging
  6. Performance: Consider async operations for I/O-bound steps
  7. Modularity: Create reusable pipeline components

Common Use Cases

  • Form Processing: Multi-step validation and transformation
  • Data Enrichment: Add context from multiple sources
  • Business Rule Engine: Apply sequential business logic
  • Content Analysis: Multi-stage text/voice analysis
  • Order Processing: Complex e-commerce workflows
  • Data Migration: Transform data through multiple stages

This pattern enables complex data processing through route composition.