Streaming Responses

Streaming responses enable real-time, chunk-by-chunk delivery of agent responses, providing lower latency and more natural conversational flow.

Pattern Overview

This pattern enables:

  • Low Latency: Start speaking as soon as first chunk is available
  • Natural Flow: Stream responses as they’re generated by LLMs
  • Interruption Support: Each chunk can be individually interrupted
  • Progressive Processing: Handle large responses incrementally

Key Components

Events

  • AgentResponse: Individual response chunks with content
  • Chunk Types: Text chunks, audio chunks, or other media types
  • Progressive Delivery: Multiple events for a single logical response

Nodes

  • Async Generators: Use yield to emit response chunks
  • LLM Integration: Stream directly from LLM API responses
  • Chunk Processing: Transform and validate each chunk

Routes

  • stream(): Process async generators that yield multiple values
  • broadcast(): Send each chunk immediately to output
  • Interruption: Handle cancellation between chunks

Basic Streaming Example

1from line.nodes import ReasoningNode
2from line.events import AgentResponse
3
4class StreamingChatNode(ReasoningNode):
5 def __init__(self, system_prompt, llm_client):
6 super().__init__(system_prompt=system_prompt)
7 self.llm_client = llm_client
8
9 async def process_context(self, context):
10 # Format conversation for LLM
11 messages = self.format_for_llm(context.events)
12
13 # Stream response chunks
14 async for chunk in self.llm_client.generate_stream(messages):
15 if chunk.text:
16 yield AgentResponse(
17 content=chunk.text,
18 chunk_type="text"
19 )
20
21# Route streaming responses
22(
23 bridge.on(UserStoppedSpeaking)
24 .stream(node.generate) # Processes each yielded chunk
25 .broadcast() # Sends each chunk immediately
26)

Advanced Streaming Patterns

Chunked Processing with Validation

1class ValidatedStreamingNode(ReasoningNode):
2 async def process_context(self, context):
3 accumulated_response = ""
4
5 async for chunk in self.llm_client.generate_stream(context):
6 # Validate each chunk
7 if self.is_valid_chunk(chunk.text):
8 accumulated_response += chunk.text
9
10 # Yield validated chunk
11 yield AgentResponse(
12 content=chunk.text,
13 chunk_type="text"
14 )
15 else:
16 # Handle invalid chunks
17 logger.warning(f"Invalid chunk filtered: {chunk.text}")
18
19 # Optionally yield completion marker
20 yield AgentGenerationComplete(
21 total_length=len(accumulated_response),
22 chunk_count=self.chunk_counter
23 )
24
25 def is_valid_chunk(self, text):
26 # Filter inappropriate content, malformed text, etc.
27 return (
28 text and
29 len(text.strip()) > 0 and
30 not self.contains_inappropriate_content(text)
31 )

Multi-Stage Streaming Pipeline

1async def preprocess_chunks(message):
2 """First stage: clean and format chunks."""
3 chunk = message.event
4
5 # Clean up chunk text
6 cleaned_text = clean_text(chunk.content)
7
8 # Add formatting
9 if needs_punctuation(cleaned_text):
10 cleaned_text = add_punctuation(cleaned_text)
11
12 yield AgentResponse(
13 content=cleaned_text,
14 chunk_type="preprocessed"
15 )
16
17async def postprocess_chunks(message):
18 """Second stage: final processing and validation."""
19 chunk = message.event
20
21 # Apply final transformations
22 final_text = apply_style_guide(chunk.content)
23
24 # Validate before sending to user
25 if is_appropriate_for_user(final_text):
26 yield AgentResponse(
27 content=final_text,
28 chunk_type="final"
29 )
30
31# Multi-stage streaming pipeline
32(
33 bridge.on(UserStoppedSpeaking)
34 .stream(node.generate) # Generate raw chunks
35 .stream(preprocess_chunks) # Clean chunks
36 .stream(postprocess_chunks) # Final processing
37 .broadcast() # Send to user
38)

Buffered Streaming

1class BufferedStreamingNode(ReasoningNode):
2 def __init__(self, buffer_size=50):
3 super().__init__()
4 self.buffer_size = buffer_size
5
6 async def process_context(self, context):
7 buffer = ""
8
9 async for chunk in self.llm_client.generate_stream(context):
10 buffer += chunk.text
11
12 # Send buffer when it reaches target size or at sentence boundaries
13 if (len(buffer) >= self.buffer_size or
14 self.ends_with_sentence(buffer)):
15
16 yield AgentResponse(
17 content=buffer,
18 chunk_type="buffered"
19 )
20 buffer = ""
21
22 # Send any remaining content
23 if buffer.strip():
24 yield AgentResponse(
25 content=buffer,
26 chunk_type="final"
27 )
28
29 def ends_with_sentence(self, text):
30 return text.rstrip().endswith(('.', '!', '?'))

Streaming with Tool Integration

1class ToolAwareStreamingNode(ReasoningNode):
2 async def process_context(self, context):
3 # Check for tool results first
4 tool_results = [e for e in context.events if isinstance(e, ToolResult)]
5
6 if tool_results:
7 # Stream response incorporating tool results
8 async for chunk in self.generate_with_tools(context, tool_results):
9 yield chunk
10 else:
11 # Check if we need to call tools
12 user_input = context.get_latest_user_transcript_message()
13
14 if self.requires_tool_call(user_input):
15 # Call tool first
16 tool_call = self.determine_tool_call(user_input)
17 yield tool_call
18
19 # Stream initial response while tool executes
20 yield AgentResponse(content="Let me check that for you...")
21 else:
22 # Normal streaming response
23 async for chunk in self.generate_normal_response(context):
24 yield chunk
25
26 async def generate_with_tools(self, context, tool_results):
27 """Stream response that incorporates tool results."""
28 # Format context with tool results
29 enhanced_context = self.add_tool_results_to_context(context, tool_results)
30
31 async for chunk in self.llm_client.generate_stream(enhanced_context):
32 if chunk.text:
33 yield AgentResponse(content=chunk.text)

Interruption-Aware Streaming

1class InterruptibleStreamingNode(ReasoningNode):
2 def __init__(self):
3 super().__init__()
4 self.current_stream = None
5 self.chunks_sent = 0
6
7 def on_interrupt_generate(self, message):
8 """Handle interruption during streaming."""
9 logger.info(f"Streaming interrupted after {self.chunks_sent} chunks")
10
11 # Clean up streaming resources
12 if self.current_stream:
13 # Cancel ongoing LLM stream
14 self.current_stream.cancel()
15 self.current_stream = None
16
17 self.chunks_sent = 0
18
19 async def process_context(self, context):
20 try:
21 self.chunks_sent = 0
22 self.current_stream = self.llm_client.generate_stream(context)
23
24 async for chunk in self.current_stream:
25 if chunk.text:
26 yield AgentResponse(content=chunk.text)
27 self.chunks_sent += 1
28
29 except asyncio.CancelledError:
30 logger.info("Stream cancelled due to interruption")
31 raise
32 finally:
33 self.current_stream = None
34
35# Route with interruption support
36(
37 bridge.on(UserStoppedSpeaking)
38 .interrupt_on(UserStartedSpeaking, handler=node.on_interrupt_generate)
39 .stream(node.generate)
40 .broadcast()
41)

Performance Optimizations

Parallel Streaming

1class ParallelStreamingNode(ReasoningNode):
2 async def process_context(self, context):
3 # Start multiple streams in parallel for faster response
4 tasks = [
5 self.generate_main_response(context),
6 self.generate_context_analysis(context),
7 self.generate_suggestions(context)
8 ]
9
10 # Stream results as they become available
11 async for chunk in self.merge_streams(tasks):
12 yield chunk
13
14 async def merge_streams(self, tasks):
15 """Merge multiple async generators."""
16 active_tasks = [task.__aiter__() for task in tasks]
17
18 while active_tasks:
19 # Wait for next chunk from any stream
20 done, pending = await asyncio.wait(
21 [task.__anext__() for task in active_tasks],
22 return_when=asyncio.FIRST_COMPLETED
23 )
24
25 for task in done:
26 try:
27 chunk = await task
28 yield chunk
29 except StopAsyncIteration:
30 # Stream completed, remove from active tasks
31 active_tasks.remove(task)

Cached Streaming

1class CachedStreamingNode(ReasoningNode):
2 def __init__(self):
3 super().__init__()
4 self.response_cache = {}
5
6 async def process_context(self, context):
7 # Create cache key from context
8 cache_key = self.create_cache_key(context)
9
10 if cache_key in self.response_cache:
11 # Stream cached response
12 cached_response = self.response_cache[cache_key]
13 for chunk in cached_response:
14 yield AgentResponse(content=chunk)
15 else:
16 # Generate and cache new response
17 response_chunks = []
18
19 async for chunk in self.llm_client.generate_stream(context):
20 if chunk.text:
21 response_chunks.append(chunk.text)
22 yield AgentResponse(content=chunk.text)
23
24 # Cache the complete response
25 self.response_cache[cache_key] = response_chunks

Best Practices

  1. Small Chunks: Send manageable chunk sizes for smooth streaming
  2. Buffer Management: Use appropriate buffering for sentence boundaries
  3. Error Handling: Handle stream cancellation gracefully
  4. Resource Cleanup: Always clean up streaming resources on interruption
  5. Progress Tracking: Monitor streaming progress for debugging
  6. Rate Limiting: Consider rate limits for high-frequency streaming
  7. Memory Management: Clear large responses after streaming completes

Common Use Cases

  • Conversational Agents: Real-time chat responses
  • Content Generation: Long-form content with immediate feedback
  • Live Translation: Streaming translation of ongoing speech
  • Code Generation: Progressive code output with syntax validation
  • Data Analysis: Streaming analysis results as they’re computed
  • Multi-modal Responses: Streaming text while preparing audio/images

Troubleshooting

Choppy Streaming

  • Increase buffer size for smoother delivery
  • Check network latency between LLM API and application
  • Monitor CPU usage during chunk processing

Memory Issues

  • Implement response chunk limits
  • Clear large context periodically
  • Monitor memory usage during long streams

Interruption Problems

  • Ensure proper CancelledError handling in async generators
  • Test interruption at different streaming stages
  • Verify resource cleanup in interrupt handlers

This pattern is essential for creating responsive, natural-feeling voice agents that provide immediate feedback to users while generating comprehensive responses.