Interruption Handling

Interruption handling allows agents to gracefully stop generating responses when the user starts speaking, creating natural conversational flow.

Pattern Overview

This pattern enables:

  • Natural Conversation: User can interrupt agent at any time
  • Graceful Cancellation: Agent stops generating without errors
  • Resource Cleanup: Proper cleanup of ongoing operations
  • Seamless Recovery: Agent can resume processing new user input

Key Components

Events

  • UserStartedSpeaking: Triggers interruption of ongoing generation
  • UserStoppedSpeaking: Resumes normal processing flow
  • AgentGenerationComplete: Signals when generation finishes

Routes

  • interrupt_on(): Defines which events should cancel the current operation
  • Interrupt Handlers: Custom functions to handle cancellation cleanup
  • Task Cancellation: Automatic cancellation of async tasks

Nodes

  • Interrupt Awareness: Nodes handle asyncio.CancelledError gracefully
  • Cleanup Logic: Implement interrupt handlers for resource cleanup
  • State Recovery: Maintain consistent state after interruptions

Basic Example

1# Basic interruption setup
2(
3 bridge.on(UserStoppedSpeaking)
4 .interrupt_on(UserStartedSpeaking)
5 .stream(node.generate)
6 .broadcast()
7)
8
9# With custom interrupt handler
10async def handle_interrupt(message):
11 logger.info("User interrupted, stopping generation")
12 # Perform any needed cleanup
13 await cleanup_resources()
14
15(
16 bridge.on(UserStoppedSpeaking)
17 .interrupt_on(UserStartedSpeaking, handler=handle_interrupt)
18 .stream(node.generate)
19 .broadcast()
20)

Node-Level Interruption Handling

1class InterruptAwareNode(ReasoningNode):
2 def on_interrupt_generate(self, message):
3 """Called when generation is interrupted."""
4 logger.info("Generation interrupted by user")
5 # Cleanup streaming resources, cancel API calls, etc.
6
7 async def process_context(self, context):
8 try:
9 # Stream response chunks
10 async for chunk in self.llm_client.generate_stream(messages):
11 yield AgentResponse(content=chunk.text)
12
13 except asyncio.CancelledError:
14 # Handle cancellation gracefully
15 logger.info("Generation cancelled due to user interruption")
16 # Perform any final cleanup
17 raise # Re-raise to complete cancellation
18
19# Connect interrupt handler in route
20(
21 bridge.on(UserStoppedSpeaking)
22 .interrupt_on(UserStartedSpeaking, handler=node.on_interrupt_generate)
23 .stream(node.generate)
24 .broadcast()
25)

Advanced Patterns

Conditional Interruption

1# Only allow interruption after agent has spoken for a minimum time
2def can_interrupt(message):
3 return time.time() - generation_start_time > MIN_SPEAK_TIME
4
5(
6 bridge.on(UserStoppedSpeaking)
7 .interrupt_on(UserStartedSpeaking, condition=can_interrupt)
8 .stream(node.generate)
9 .broadcast()
10)

Multiple Interrupt Events

1# Interrupt on multiple event types
2(
3 bridge.on(UserStoppedSpeaking)
4 .interrupt_on([UserStartedSpeaking, EmergencyStop, AgentHandoff])
5 .stream(node.generate)
6 .broadcast()
7)

State Management During Interruptions

1class StatefulInterruptNode(ReasoningNode):
2 def __init__(self):
3 super().__init__()
4 self.generation_state = None
5
6 def on_interrupt_generate(self, message):
7 # Save state for potential recovery
8 self.generation_state = {
9 'interrupted_at': time.time(),
10 'partial_response': self.current_response,
11 'context_snapshot': self.context.copy()
12 }
13 logger.info("Saved generation state for recovery")
14
15 async def process_context(self, context):
16 try:
17 # Check if resuming from interruption
18 if self.generation_state:
19 logger.info("Resuming from previous interruption")
20 # Potentially use saved state
21 self.generation_state = None
22
23 # Normal generation flow
24 async for chunk in self.generate_response(context):
25 yield AgentResponse(content=chunk)
26
27 except asyncio.CancelledError:
28 logger.info("Generation cancelled")
29 raise

Best Practices

  1. Always Handle CancelledError: Nodes should gracefully handle task cancellation
  2. Cleanup Resources: Use interrupt handlers to cleanup API calls, files, connections
  3. Maintain State Consistency: Ensure node state remains valid after interruptions
  4. Log Interruptions: Track interruption patterns for debugging and optimization
  5. Fast Interruption Response: Keep interrupt handlers lightweight and fast
  6. Recovery Planning: Consider how to handle partial responses and state recovery

Common Use Cases

  • Voice Conversations: Natural turn-taking in voice interactions
  • Long Responses: Allow interruption of lengthy agent responses
  • Emergency Stops: Immediate cancellation for safety or escalation
  • Context Switches: Interrupt current task when user changes topic
  • Multi-Agent Handoffs: Cancel current agent when transferring to another

Troubleshooting

Generation Doesn’t Stop

  • Ensure interrupt_on() is properly configured on the route
  • Check that the node properly handles asyncio.CancelledError
  • Verify interrupt events are being properly emitted

Resource Leaks

  • Implement interrupt handlers to cleanup streaming connections
  • Use async with contexts for resource management
  • Cancel pending API calls in interrupt handlers

State Corruption

  • Save critical state before generation starts
  • Validate state consistency after interruptions
  • Consider using database transactions for critical state changes

This pattern is essential for creating natural, responsive voice agents that feel conversational rather than robotic.