Event Transformation

Event transformation patterns enable converting one event type to another, enriching event data, and creating custom event flows for specialized processing.

Pattern Overview

This pattern enables:

  • Type Conversion: Transform built-in events to custom events
  • Data Enrichment: Add context and metadata to events
  • Event Splitting: Convert one event into multiple specialized events
  • Event Aggregation: Combine multiple events into summary events

Key Components

Events

  • Source Events: Original events to be transformed
  • Target Events: New events created from transformations
  • Custom Events: Domain-specific event types for your application

Routes

  • Transform Mapping: Use map() to convert event types
  • Broadcasting: Send transformed events through the system
  • Filtering: Apply transformations conditionally

Transformation Functions

  • Pure Transformations: Convert data without side effects
  • Enrichment: Add external data to events
  • Analysis: Extract insights and create derived events

Basic Transformation Example

1from pydantic import BaseModel
2from datetime import datetime
3
4# Custom events for sentiment analysis
5class SentimentAnalysis(BaseModel):
6 text: str
7 sentiment: str
8 confidence: float
9 timestamp: datetime
10
11class ConversationInsight(BaseModel):
12 insight_type: str
13 description: str
14 confidence: float
15
16# Transform user input to sentiment analysis
17def analyze_sentiment(message) -> SentimentAnalysis:
18 """Transform UserTranscriptionReceived to SentimentAnalysis."""
19 transcription = message.event
20
21 # Analyze sentiment (simplified)
22 sentiment_result = sentiment_analyzer.analyze(transcription.content)
23
24 return SentimentAnalysis(
25 text=transcription.content,
26 sentiment=sentiment_result.label,
27 confidence=sentiment_result.score,
28 timestamp=datetime.now()
29 )
30
31# Route transformation
32(
33 bridge.on(UserTranscriptionReceived)
34 .map(analyze_sentiment)
35 .broadcast()
36)
37
38# Other agents can subscribe to sentiment analysis
39analysis_bridge.on(SentimentAnalysis).map(process_sentiment_insight)

Advanced Transformation Patterns

Multi-Target Transformation

1class CustomerProfile(BaseModel):
2 customer_id: str
3 interaction_type: str
4 sentiment: str
5 topics: list[str]
6
7class RiskAssessment(BaseModel):
8 customer_id: str
9 risk_level: str
10 risk_factors: list[str]
11
12async def analyze_customer_interaction(message):
13 """Transform user input into multiple analysis events."""
14 user_input = message.event.content
15
16 # Extract customer context
17 customer_id = extract_customer_id(message)
18
19 # Analyze interaction
20 topics = await topic_classifier.classify(user_input)
21 sentiment = await sentiment_analyzer.analyze(user_input)
22 risk_factors = await risk_analyzer.assess(user_input, customer_id)
23
24 # Yield multiple transformed events
25 yield CustomerProfile(
26 customer_id=customer_id,
27 interaction_type="voice_call",
28 sentiment=sentiment.label,
29 topics=topics
30 )
31
32 if risk_factors:
33 yield RiskAssessment(
34 customer_id=customer_id,
35 risk_level=calculate_risk_level(risk_factors),
36 risk_factors=risk_factors
37 )
38
39# Stream transformation to multiple event types
40(
41 bridge.on(UserTranscriptionReceived)
42 .stream(analyze_customer_interaction)
43 .broadcast()
44)

Event Aggregation

1class ConversationSummary(BaseModel):
2 conversation_id: str
3 duration_minutes: int
4 total_exchanges: int
5 customer_satisfaction: str
6 key_topics: list[str]
7 resolution_status: str
8
9class ConversationAggregator:
10 def __init__(self):
11 self.conversation_data = {}
12
13 def aggregate_events(self, message):
14 """Aggregate various events into conversation summary."""
15 event = message.event
16 conv_id = getattr(event, 'conversation_id', 'default')
17
18 # Initialize conversation tracking
19 if conv_id not in self.conversation_data:
20 self.conversation_data[conv_id] = {
21 'start_time': datetime.now(),
22 'exchanges': 0,
23 'sentiments': [],
24 'topics': set(),
25 'events': []
26 }
27
28 conv = self.conversation_data[conv_id]
29 conv['events'].append(event)
30
31 # Process different event types
32 if isinstance(event, UserTranscriptionReceived):
33 conv['exchanges'] += 1
34 elif isinstance(event, SentimentAnalysis):
35 conv['sentiments'].append(event.sentiment)
36 elif isinstance(event, TopicAnalysis):
37 conv['topics'].update(event.topics)
38 elif isinstance(event, EndCall):
39 # Generate summary when conversation ends
40 summary = ConversationSummary(
41 conversation_id=conv_id,
42 duration_minutes=self._calculate_duration(conv['start_time']),
43 total_exchanges=conv['exchanges'],
44 customer_satisfaction=self._analyze_satisfaction(conv['sentiments']),
45 key_topics=list(conv['topics']),
46 resolution_status=self._determine_resolution(conv['events'])
47 )
48
49 # Clean up tracking data
50 del self.conversation_data[conv_id]
51 return summary
52
53 return None # No summary yet
54
55aggregator = ConversationAggregator()
56
57# Aggregate multiple event types
58(
59 bridge.on([UserTranscriptionReceived, SentimentAnalysis, TopicAnalysis, EndCall])
60 .map(aggregator.aggregate_events)
61 .filter(lambda summary: summary is not None)
62 .broadcast()
63)

Conditional Transformation

1def transform_based_on_context(message):
2 """Transform events differently based on context."""
3 user_input = message.event
4 context = get_conversation_context(message.source)
5
6 # Different transformations based on conversation state
7 if context.state == "authentication":
8 return SecurityEvent(
9 event_type="auth_attempt",
10 input_text=user_input.content,
11 security_level=assess_security_risk(user_input.content)
12 )
13
14 elif context.state == "form_filling":
15 return FormFieldUpdate(
16 field_name=context.current_field,
17 field_value=user_input.content,
18 validation_status=validate_field_input(
19 context.current_field,
20 user_input.content
21 )
22 )
23
24 elif context.state == "complaint_handling":
25 return ComplaintAnalysis(
26 complaint_text=user_input.content,
27 severity=analyze_complaint_severity(user_input.content),
28 category=classify_complaint_category(user_input.content)
29 )
30
31 # Default transformation
32 return GeneralInquiry(
33 inquiry_text=user_input.content,
34 intent=classify_intent(user_input.content)
35 )
36
37(
38 bridge.on(UserTranscriptionReceived)
39 .map(transform_based_on_context)
40 .broadcast()
41)

Real-time Enrichment

1async def enrich_with_external_data(message):
2 """Enrich events with external API data."""
3 user_input = message.event
4 customer_id = extract_customer_id(message)
5
6 # Fetch enrichment data in parallel
7 customer_data, product_data, interaction_history = await asyncio.gather(
8 customer_api.get_profile(customer_id),
9 product_api.get_preferences(customer_id),
10 interaction_db.get_recent_interactions(customer_id)
11 )
12
13 return EnrichedUserInput(
14 original_content=user_input.content,
15 customer_profile=customer_data,
16 product_preferences=product_data,
17 interaction_context=interaction_history,
18 enrichment_timestamp=datetime.now()
19 )
20
21(
22 bridge.on(UserTranscriptionReceived)
23 .map(enrich_with_external_data)
24 .broadcast()
25)

Event Chain Transformation

1def create_event_chain(initial_event):
2 """Create a chain of related events from an initial event."""
3 events = []
4
5 # Base analysis
6 analysis = analyze_user_input(initial_event)
7 events.append(UserInputAnalysis(
8 content=initial_event.content,
9 intent=analysis.intent,
10 entities=analysis.entities
11 ))
12
13 # Intent-specific events
14 if analysis.intent == "product_inquiry":
15 events.append(ProductInterest(
16 products=analysis.entities.get('products', []),
17 interest_level=analysis.confidence
18 ))
19
20 elif analysis.intent == "support_request":
21 events.append(SupportTicket(
22 category=analysis.entities.get('category', 'general'),
23 priority=determine_priority(analysis),
24 description=initial_event.content
25 ))
26
27 # Always create engagement metric
28 events.append(EngagementMetric(
29 interaction_type="user_input",
30 engagement_score=calculate_engagement(analysis),
31 timestamp=datetime.now()
32 ))
33
34 return events
35
36async def chain_transformer(message):
37 """Generator that yields chain of transformed events."""
38 event_chain = create_event_chain(message.event)
39 for event in event_chain:
40 yield event
41
42(
43 bridge.on(UserTranscriptionReceived)
44 .stream(chain_transformer)
45 .broadcast()
46)

Best Practices

  1. Type Safety: Use Pydantic models for all custom events
  2. Documentation: Document what each transformation does and why
  3. Error Handling: Handle transformation failures gracefully
  4. Performance: Cache expensive operations like API calls
  5. Selective Processing: Use filters to avoid unnecessary transformations
  6. Event Naming: Use clear, domain-specific event names
  7. Versioning: Consider event schema versioning for evolving systems

Common Use Cases

  • Analytics Pipeline: Transform user interactions into analytics events
  • Domain Events: Convert generic events to business domain events
  • Monitoring: Transform application events into monitoring/alerting events
  • Integration: Convert events for external system integration
  • State Machine: Transform events based on current system state
  • A/B Testing: Create different event streams for testing variants

Troubleshooting

Missing Transformations

  • Verify event type matching in .on() clauses
  • Check that transformation functions return correct types
  • Ensure broadcasted events are subscribed to by other bridges

Performance Issues

  • Use async transformations for I/O operations
  • Cache frequently accessed external data
  • Consider batch processing for high-volume transformations

Data Quality

  • Validate transformed data with Pydantic models
  • Add logging to track transformation success/failure rates
  • Monitor for missing or corrupted event data

This pattern is essential for building event-driven systems that adapt events to different processing contexts.