class ConversationSummary(BaseModel):
conversation_id: str
duration_minutes: int
total_exchanges: int
customer_satisfaction: str
key_topics: list[str]
resolution_status: str
class ConversationAggregator:
def __init__(self):
self.conversation_data = {}
def aggregate_events(self, message):
"""Aggregate various events into conversation summary."""
event = message.event
conv_id = getattr(event, "conversation_id", "default")
# Initialize conversation tracking
if conv_id not in self.conversation_data:
self.conversation_data[conv_id] = {
"start_time": datetime.now(),
"exchanges": 0,
"sentiments": [],
"topics": set(),
"events": [],
}
conv = self.conversation_data[conv_id]
conv["events"].append(event)
# Process different event types
if isinstance(event, UserTranscriptionReceived):
conv["exchanges"] += 1
elif isinstance(event, SentimentAnalysis):
conv["sentiments"].append(event.sentiment)
elif isinstance(event, TopicAnalysis):
conv["topics"].update(event.topics)
elif isinstance(event, EndCall):
# Generate summary when conversation ends
summary = ConversationSummary(
conversation_id=conv_id,
duration_minutes=self._calculate_duration(
conv["start_time"]
),
total_exchanges=conv["exchanges"],
customer_satisfaction=self._analyze_satisfaction(
conv["sentiments"]
),
key_topics=list(conv["topics"]),
resolution_status=self._determine_resolution(
conv["events"]
),
)
# Clean up tracking data
del self.conversation_data[conv_id]
return summary
return None # No summary yet
aggregator = ConversationAggregator()
# Aggregate multiple event types
(
bridge.on(
[
UserTranscriptionReceived,
SentimentAnalysis,
TopicAnalysis,
EndCall,
]
)
.map(aggregator.aggregate_events)
.filter(lambda summary: summary is not None)
.broadcast()
)