Event Transformation

Event transformation patterns enable converting one event type to another, enriching event data, and creating custom event flows for specialized processing.

Pattern Overview

This pattern enables:

  • Type Conversion: Transform built-in events to custom events
  • Data Enrichment: Add context and metadata to events
  • Event Splitting: Convert one event into multiple specialized events
  • Event Aggregation: Combine multiple events into summary events

Key Components

Events

  • Source Events: Original events to be transformed
  • Target Events: New events created from transformations
  • Custom Events: Domain-specific event types for your application

Routes

  • Transform Mapping: Use map() to convert event types
  • Broadcasting: Send transformed events through the system
  • Filtering: Apply transformations conditionally

Transformation Functions

  • Pure Transformations: Convert data without side effects
  • Enrichment: Add external data to events
  • Analysis: Extract insights and create derived events

Basic Transformation Example

1from pydantic import BaseModel
2from datetime import datetime
3
4
5# Custom events for sentiment analysis
6class SentimentAnalysis(BaseModel):
7 text: str
8 sentiment: str
9 confidence: float
10 timestamp: datetime
11
12
13class ConversationInsight(BaseModel):
14 insight_type: str
15 description: str
16 confidence: float
17
18
19# Transform user input to sentiment analysis
20def analyze_sentiment(message) -> SentimentAnalysis:
21 """Transform UserTranscriptionReceived to SentimentAnalysis."""
22 transcription = message.event
23
24 # Analyze sentiment (simplified)
25 sentiment_result = sentiment_analyzer.analyze(
26 transcription.content
27 )
28
29 return SentimentAnalysis(
30 text=transcription.content,
31 sentiment=sentiment_result.label,
32 confidence=sentiment_result.score,
33 timestamp=datetime.now(),
34 )
35
36
37# Route transformation
38(
39 bridge.on(UserTranscriptionReceived)
40 .map(analyze_sentiment)
41 .broadcast()
42)
43
44# Other agents can subscribe to sentiment analysis
45analysis_bridge.on(SentimentAnalysis).map(process_sentiment_insight)

Advanced Transformation Patterns

Multi-Target Transformation

1class CustomerProfile(BaseModel):
2 customer_id: str
3 interaction_type: str
4 sentiment: str
5 topics: list[str]
6
7
8class RiskAssessment(BaseModel):
9 customer_id: str
10 risk_level: str
11 risk_factors: list[str]
12
13
14async def analyze_customer_interaction(message):
15 """Transform user input into multiple analysis events."""
16 user_input = message.event.content
17
18 # Extract customer context
19 customer_id = extract_customer_id(message)
20
21 # Analyze interaction
22 topics = await topic_classifier.classify(user_input)
23 sentiment = await sentiment_analyzer.analyze(user_input)
24 risk_factors = await risk_analyzer.assess(user_input, customer_id)
25
26 # Yield multiple transformed events
27 yield CustomerProfile(
28 customer_id=customer_id,
29 interaction_type="voice_call",
30 sentiment=sentiment.label,
31 topics=topics,
32 )
33
34 if risk_factors:
35 yield RiskAssessment(
36 customer_id=customer_id,
37 risk_level=calculate_risk_level(risk_factors),
38 risk_factors=risk_factors,
39 )
40
41
42# Stream transformation to multiple event types
43(
44 bridge.on(UserTranscriptionReceived)
45 .stream(analyze_customer_interaction)
46 .broadcast()
47)

Event Aggregation

1class ConversationSummary(BaseModel):
2 conversation_id: str
3 duration_minutes: int
4 total_exchanges: int
5 customer_satisfaction: str
6 key_topics: list[str]
7 resolution_status: str
8
9
10class ConversationAggregator:
11 def __init__(self):
12 self.conversation_data = {}
13
14 def aggregate_events(self, message):
15 """Aggregate various events into conversation summary."""
16 event = message.event
17 conv_id = getattr(event, "conversation_id", "default")
18
19 # Initialize conversation tracking
20 if conv_id not in self.conversation_data:
21 self.conversation_data[conv_id] = {
22 "start_time": datetime.now(),
23 "exchanges": 0,
24 "sentiments": [],
25 "topics": set(),
26 "events": [],
27 }
28
29 conv = self.conversation_data[conv_id]
30 conv["events"].append(event)
31
32 # Process different event types
33 if isinstance(event, UserTranscriptionReceived):
34 conv["exchanges"] += 1
35 elif isinstance(event, SentimentAnalysis):
36 conv["sentiments"].append(event.sentiment)
37 elif isinstance(event, TopicAnalysis):
38 conv["topics"].update(event.topics)
39 elif isinstance(event, EndCall):
40 # Generate summary when conversation ends
41 summary = ConversationSummary(
42 conversation_id=conv_id,
43 duration_minutes=self._calculate_duration(
44 conv["start_time"]
45 ),
46 total_exchanges=conv["exchanges"],
47 customer_satisfaction=self._analyze_satisfaction(
48 conv["sentiments"]
49 ),
50 key_topics=list(conv["topics"]),
51 resolution_status=self._determine_resolution(
52 conv["events"]
53 ),
54 )
55
56 # Clean up tracking data
57 del self.conversation_data[conv_id]
58 return summary
59
60 return None # No summary yet
61
62
63aggregator = ConversationAggregator()
64
65# Aggregate multiple event types
66(
67 bridge.on(
68 [
69 UserTranscriptionReceived,
70 SentimentAnalysis,
71 TopicAnalysis,
72 EndCall,
73 ]
74 )
75 .map(aggregator.aggregate_events)
76 .filter(lambda summary: summary is not None)
77 .broadcast()
78)

Conditional Transformation

1def transform_based_on_context(message):
2 """Transform events differently based on context."""
3 user_input = message.event
4 context = get_conversation_context(message.source)
5
6 # Different transformations based on conversation state
7 if context.state == "authentication":
8 return SecurityEvent(
9 event_type="auth_attempt",
10 input_text=user_input.content,
11 security_level=assess_security_risk(user_input.content),
12 )
13
14 elif context.state == "form_filling":
15 return FormFieldUpdate(
16 field_name=context.current_field,
17 field_value=user_input.content,
18 validation_status=validate_field_input(
19 context.current_field, user_input.content
20 ),
21 )
22
23 elif context.state == "complaint_handling":
24 return ComplaintAnalysis(
25 complaint_text=user_input.content,
26 severity=analyze_complaint_severity(user_input.content),
27 category=classify_complaint_category(user_input.content),
28 )
29
30 # Default transformation
31 return GeneralInquiry(
32 inquiry_text=user_input.content,
33 intent=classify_intent(user_input.content),
34 )
35
36
37(
38 bridge.on(UserTranscriptionReceived)
39 .map(transform_based_on_context)
40 .broadcast()
41)

Real-time Enrichment

1async def enrich_with_external_data(message):
2 """Enrich events with external API data."""
3 user_input = message.event
4 customer_id = extract_customer_id(message)
5
6 # Fetch enrichment data in parallel
7 (
8 customer_data,
9 product_data,
10 interaction_history,
11 ) = await asyncio.gather(
12 customer_api.get_profile(customer_id),
13 product_api.get_preferences(customer_id),
14 interaction_db.get_recent_interactions(customer_id),
15 )
16
17 return EnrichedUserInput(
18 original_content=user_input.content,
19 customer_profile=customer_data,
20 product_preferences=product_data,
21 interaction_context=interaction_history,
22 enrichment_timestamp=datetime.now(),
23 )
24
25
26(
27 bridge.on(UserTranscriptionReceived)
28 .map(enrich_with_external_data)
29 .broadcast()
30)

Event Chain Transformation

1def create_event_chain(initial_event):
2 """Create a chain of related events from an initial event."""
3 events = []
4
5 # Base analysis
6 analysis = analyze_user_input(initial_event)
7 events.append(
8 UserInputAnalysis(
9 content=initial_event.content,
10 intent=analysis.intent,
11 entities=analysis.entities,
12 )
13 )
14
15 # Intent-specific events
16 if analysis.intent == "product_inquiry":
17 events.append(
18 ProductInterest(
19 products=analysis.entities.get("products", []),
20 interest_level=analysis.confidence,
21 )
22 )
23
24 elif analysis.intent == "support_request":
25 events.append(
26 SupportTicket(
27 category=analysis.entities.get("category", "general"),
28 priority=determine_priority(analysis),
29 description=initial_event.content,
30 )
31 )
32
33 # Always create engagement metric
34 events.append(
35 EngagementMetric(
36 interaction_type="user_input",
37 engagement_score=calculate_engagement(analysis),
38 timestamp=datetime.now(),
39 )
40 )
41
42 return events
43
44
45async def chain_transformer(message):
46 """Generator that yields chain of transformed events."""
47 event_chain = create_event_chain(message.event)
48 for event in event_chain:
49 yield event
50
51
52(
53 bridge.on(UserTranscriptionReceived)
54 .stream(chain_transformer)
55 .broadcast()
56)

Best Practices

  1. Type Safety: Use Pydantic models for all custom events
  2. Documentation: Document what each transformation does and why
  3. Error Handling: Handle transformation failures gracefully
  4. Performance: Cache expensive operations like API calls
  5. Selective Processing: Use filters to avoid unnecessary transformations
  6. Event Naming: Use clear, domain-specific event names
  7. Versioning: Consider event schema versioning for evolving systems

Common Use Cases

  • Analytics Pipeline: Transform user interactions into analytics events
  • Domain Events: Convert generic events to business domain events
  • Monitoring: Transform application events into monitoring/alerting events
  • Integration: Convert events for external system integration
  • State Machine: Transform events based on current system state
  • A/B Testing: Create different event streams for testing variants

Troubleshooting

Missing Transformations

  • Verify event type matching in .on() clauses
  • Check that transformation functions return correct types
  • Ensure broadcasted events are subscribed to by other bridges

Performance Issues

  • Use async transformations for I/O operations
  • Cache frequently accessed external data
  • Consider batch processing for high-volume transformations

Data Quality

  • Validate transformed data with Pydantic models
  • Add logging to track transformation success/failure rates
  • Monitor for missing or corrupted event data

This pattern is essential for building event-driven systems that adapt events to different processing contexts.