Interruption Handling

Interruption handling allows agents to gracefully stop generating responses when the user starts speaking, creating natural conversational flow.

Pattern Overview

This pattern enables:

  • Natural Conversation: User can interrupt agent at any time
  • Graceful Cancellation: Agent stops generating without errors
  • Resource Cleanup: Proper cleanup of ongoing operations
  • Seamless Recovery: Agent can resume processing new user input

Key Components

Events

  • UserStartedSpeaking: Triggers interruption of ongoing generation
  • UserStoppedSpeaking: Resumes normal processing flow
  • AgentGenerationComplete: Signals when generation finishes

Routes

  • interrupt_on(): Defines which events should cancel the current operation
  • Interrupt Handlers: Custom functions to handle cancellation cleanup
  • Task Cancellation: Automatic cancellation of async tasks

Nodes

  • Interrupt Awareness: Nodes handle asyncio.CancelledError gracefully
  • Cleanup Logic: Implement interrupt handlers for resource cleanup
  • State Recovery: Maintain consistent state after interruptions

Basic Example

1# Basic interruption setup
2(
3 bridge.on(UserStoppedSpeaking)
4 .interrupt_on(UserStartedSpeaking)
5 .stream(node.generate)
6 .broadcast()
7)
8
9
10# With custom interrupt handler
11async def handle_interrupt(message):
12 logger.info("User interrupted, stopping generation")
13 # Perform any needed cleanup
14 await cleanup_resources()
15
16
17(
18 bridge.on(UserStoppedSpeaking)
19 .interrupt_on(UserStartedSpeaking, handler=handle_interrupt)
20 .stream(node.generate)
21 .broadcast()
22)

Node-Level Interruption Handling

1class InterruptAwareNode(ReasoningNode):
2 def on_interrupt_generate(self, message):
3 """Called when generation is interrupted."""
4 logger.info("Generation interrupted by user")
5 # Cleanup streaming resources, cancel API calls, etc.
6
7 async def process_context(self, context):
8 try:
9 # Stream response chunks
10 async for chunk in self.llm_client.generate_stream(
11 messages
12 ):
13 yield AgentResponse(content=chunk.text)
14
15 except asyncio.CancelledError:
16 # Handle cancellation gracefully
17 logger.info(
18 "Generation cancelled due to user interruption"
19 )
20 # Perform any final cleanup
21 raise # Re-raise to complete cancellation
22
23
24# Connect interrupt handler in route
25(
26 bridge.on(UserStoppedSpeaking)
27 .interrupt_on(
28 UserStartedSpeaking, handler=node.on_interrupt_generate
29 )
30 .stream(node.generate)
31 .broadcast()
32)

Advanced Patterns

Conditional Interruption

1# Only allow interruption after agent has spoken for a minimum time
2def can_interrupt(message):
3 return time.time() - generation_start_time > MIN_SPEAK_TIME
4
5
6(
7 bridge.on(UserStoppedSpeaking)
8 .interrupt_on(UserStartedSpeaking, condition=can_interrupt)
9 .stream(node.generate)
10 .broadcast()
11)

Multiple Interrupt Events

1# Interrupt on multiple event types
2(
3 bridge.on(UserStoppedSpeaking)
4 .interrupt_on([UserStartedSpeaking, EmergencyStop, AgentHandoff])
5 .stream(node.generate)
6 .broadcast()
7)

State Management During Interruptions

1class StatefulInterruptNode(ReasoningNode):
2 def __init__(self):
3 super().__init__()
4 self.generation_state = None
5
6 def on_interrupt_generate(self, message):
7 # Save state for potential recovery
8 self.generation_state = {
9 "interrupted_at": time.time(),
10 "partial_response": self.current_response,
11 "context_snapshot": self.context.copy(),
12 }
13 logger.info("Saved generation state for recovery")
14
15 async def process_context(self, context):
16 try:
17 # Check if resuming from interruption
18 if self.generation_state:
19 logger.info("Resuming from previous interruption")
20 # Potentially use saved state
21 self.generation_state = None
22
23 # Normal generation flow
24 async for chunk in self.generate_response(context):
25 yield AgentResponse(content=chunk)
26
27 except asyncio.CancelledError:
28 logger.info("Generation cancelled")
29 raise

Best Practices

  1. Always Handle CancelledError: Nodes should gracefully handle task cancellation
  2. Cleanup Resources: Use interrupt handlers to cleanup API calls, files, connections
  3. Maintain State Consistency: Ensure node state remains valid after interruptions
  4. Log Interruptions: Track interruption patterns for debugging and optimization
  5. Fast Interruption Response: Keep interrupt handlers lightweight and fast
  6. Recovery Planning: Consider how to handle partial responses and state recovery

Common Use Cases

  • Voice Conversations: Natural turn-taking in voice interactions
  • Long Responses: Allow interruption of lengthy agent responses
  • Emergency Stops: Immediate cancellation for safety or escalation
  • Context Switches: Interrupt current task when user changes topic
  • Multi-Agent Handoffs: Cancel current agent when transferring to another

Troubleshooting

Generation Doesn’t Stop

  • Ensure interrupt_on() is properly configured on the route
  • Check that the node properly handles asyncio.CancelledError
  • Verify interrupt events are being properly emitted

Resource Leaks

  • Implement interrupt handlers to cleanup streaming connections
  • Use async with contexts for resource management
  • Cancel pending API calls in interrupt handlers

State Corruption

  • Save critical state before generation starts
  • Validate state consistency after interruptions
  • Consider using database transactions for critical state changes

This pattern is essential for creating natural, responsive voice agents that feel conversational rather than robotic.