Streaming Responses

Streaming responses enable real-time, chunk-by-chunk delivery of agent responses, providing lower latency and more natural conversational flow.

Pattern Overview

This pattern enables:

  • Low Latency: Start speaking as soon as first chunk is available
  • Natural Flow: Stream responses as they’re generated by LLMs
  • Interruption Support: Each chunk can be individually interrupted
  • Progressive Processing: Handle large responses incrementally

Key Components

Events

  • AgentResponse: Individual response chunks with content
  • Chunk Types: Text chunks, audio chunks, or other media types
  • Progressive Delivery: Multiple events for a single logical response

Nodes

  • Async Generators: Use yield to emit response chunks
  • LLM Integration: Stream directly from LLM API responses
  • Chunk Processing: Transform and validate each chunk

Routes

  • stream(): Process async generators that yield multiple values
  • broadcast(): Send each chunk immediately to output
  • Interruption: Handle cancellation between chunks

Basic Streaming Example

1from line.nodes import ReasoningNode
2from line.events import AgentResponse
3
4
5class StreamingChatNode(ReasoningNode):
6 def __init__(self, system_prompt, llm_client):
7 super().__init__(system_prompt=system_prompt)
8 self.llm_client = llm_client
9
10 async def process_context(self, context):
11 # Format conversation for LLM
12 messages = self.format_for_llm(context.events)
13
14 # Stream response chunks
15 async for chunk in self.llm_client.generate_stream(messages):
16 if chunk.text:
17 yield AgentResponse(
18 content=chunk.text, chunk_type="text"
19 )
20
21
22# Route streaming responses
23(
24 bridge.on(UserStoppedSpeaking)
25 .stream(node.generate) # Processes each yielded chunk
26 .broadcast() # Sends each chunk immediately
27)

Advanced Streaming Patterns

Chunked Processing with Validation

1class ValidatedStreamingNode(ReasoningNode):
2 async def process_context(self, context):
3 accumulated_response = ""
4
5 async for chunk in self.llm_client.generate_stream(context):
6 # Validate each chunk
7 if self.is_valid_chunk(chunk.text):
8 accumulated_response += chunk.text
9
10 # Yield validated chunk
11 yield AgentResponse(
12 content=chunk.text, chunk_type="text"
13 )
14 else:
15 # Handle invalid chunks
16 logger.warning(
17 f"Invalid chunk filtered: {chunk.text}"
18 )
19
20 # Optionally yield completion marker
21 yield AgentGenerationComplete(
22 total_length=len(accumulated_response),
23 chunk_count=self.chunk_counter,
24 )
25
26 def is_valid_chunk(self, text):
27 # Filter inappropriate content, malformed text, etc.
28 return (
29 text
30 and len(text.strip()) > 0
31 and not self.contains_inappropriate_content(text)
32 )

Multi-Stage Streaming Pipeline

1async def preprocess_chunks(message):
2 """First stage: clean and format chunks."""
3 chunk = message.event
4
5 # Clean up chunk text
6 cleaned_text = clean_text(chunk.content)
7
8 # Add formatting
9 if needs_punctuation(cleaned_text):
10 cleaned_text = add_punctuation(cleaned_text)
11
12 yield AgentResponse(
13 content=cleaned_text, chunk_type="preprocessed"
14 )
15
16
17async def postprocess_chunks(message):
18 """Second stage: final processing and validation."""
19 chunk = message.event
20
21 # Apply final transformations
22 final_text = apply_style_guide(chunk.content)
23
24 # Validate before sending to user
25 if is_appropriate_for_user(final_text):
26 yield AgentResponse(content=final_text, chunk_type="final")
27
28
29# Multi-stage streaming pipeline
30(
31 bridge.on(UserStoppedSpeaking)
32 .stream(node.generate) # Generate raw chunks
33 .stream(preprocess_chunks) # Clean chunks
34 .stream(postprocess_chunks) # Final processing
35 .broadcast() # Send to user
36)

Buffered Streaming

1class BufferedStreamingNode(ReasoningNode):
2 def __init__(self, buffer_size=50):
3 super().__init__()
4 self.buffer_size = buffer_size
5
6 async def process_context(self, context):
7 buffer = ""
8
9 async for chunk in self.llm_client.generate_stream(context):
10 buffer += chunk.text
11
12 # Send buffer when it reaches target size or at sentence boundaries
13 if len(
14 buffer
15 ) >= self.buffer_size or self.ends_with_sentence(buffer):
16 yield AgentResponse(
17 content=buffer, chunk_type="buffered"
18 )
19 buffer = ""
20
21 # Send any remaining content
22 if buffer.strip():
23 yield AgentResponse(content=buffer, chunk_type="final")
24
25 def ends_with_sentence(self, text):
26 return text.rstrip().endswith((".", "!", "?"))

Streaming with Tool Integration

1class ToolAwareStreamingNode(ReasoningNode):
2 async def process_context(self, context):
3 # Check for tool results first
4 tool_results = [
5 e for e in context.events if isinstance(e, ToolResult)
6 ]
7
8 if tool_results:
9 # Stream response incorporating tool results
10 async for chunk in self.generate_with_tools(
11 context, tool_results
12 ):
13 yield chunk
14 else:
15 # Check if we need to call tools
16 user_input = context.get_latest_user_transcript_message()
17
18 if self.requires_tool_call(user_input):
19 # Call tool first
20 tool_call = self.determine_tool_call(user_input)
21 yield tool_call
22
23 # Stream initial response while tool executes
24 yield AgentResponse(
25 content="Let me check that for you..."
26 )
27 else:
28 # Normal streaming response
29 async for chunk in self.generate_normal_response(
30 context
31 ):
32 yield chunk
33
34 async def generate_with_tools(self, context, tool_results):
35 """Stream response that incorporates tool results."""
36 # Format context with tool results
37 enhanced_context = self.add_tool_results_to_context(
38 context, tool_results
39 )
40
41 async for chunk in self.llm_client.generate_stream(
42 enhanced_context
43 ):
44 if chunk.text:
45 yield AgentResponse(content=chunk.text)

Interruption-Aware Streaming

1class InterruptibleStreamingNode(ReasoningNode):
2 def __init__(self):
3 super().__init__()
4 self.current_stream = None
5 self.chunks_sent = 0
6
7 def on_interrupt_generate(self, message):
8 """Handle interruption during streaming."""
9 logger.info(
10 f"Streaming interrupted after {self.chunks_sent} chunks"
11 )
12
13 # Clean up streaming resources
14 if self.current_stream:
15 # Cancel ongoing LLM stream
16 self.current_stream.cancel()
17 self.current_stream = None
18
19 self.chunks_sent = 0
20
21 async def process_context(self, context):
22 try:
23 self.chunks_sent = 0
24 self.current_stream = self.llm_client.generate_stream(
25 context
26 )
27
28 async for chunk in self.current_stream:
29 if chunk.text:
30 yield AgentResponse(content=chunk.text)
31 self.chunks_sent += 1
32
33 except asyncio.CancelledError:
34 logger.info("Stream cancelled due to interruption")
35 raise
36 finally:
37 self.current_stream = None
38
39
40# Route with interruption support
41(
42 bridge.on(UserStoppedSpeaking)
43 .interrupt_on(
44 UserStartedSpeaking, handler=node.on_interrupt_generate
45 )
46 .stream(node.generate)
47 .broadcast()
48)

Performance Optimizations

Parallel Streaming

1class ParallelStreamingNode(ReasoningNode):
2 async def process_context(self, context):
3 # Start multiple streams in parallel for faster response
4 tasks = [
5 self.generate_main_response(context),
6 self.generate_context_analysis(context),
7 self.generate_suggestions(context),
8 ]
9
10 # Stream results as they become available
11 async for chunk in self.merge_streams(tasks):
12 yield chunk
13
14 async def merge_streams(self, tasks):
15 """Merge multiple async generators."""
16 active_tasks = [task.__aiter__() for task in tasks]
17
18 while active_tasks:
19 # Wait for next chunk from any stream
20 done, pending = await asyncio.wait(
21 [task.__anext__() for task in active_tasks],
22 return_when=asyncio.FIRST_COMPLETED,
23 )
24
25 for task in done:
26 try:
27 chunk = await task
28 yield chunk
29 except StopAsyncIteration:
30 # Stream completed, remove from active tasks
31 active_tasks.remove(task)

Cached Streaming

1class CachedStreamingNode(ReasoningNode):
2 def __init__(self):
3 super().__init__()
4 self.response_cache = {}
5
6 async def process_context(self, context):
7 # Create cache key from context
8 cache_key = self.create_cache_key(context)
9
10 if cache_key in self.response_cache:
11 # Stream cached response
12 cached_response = self.response_cache[cache_key]
13 for chunk in cached_response:
14 yield AgentResponse(content=chunk)
15 else:
16 # Generate and cache new response
17 response_chunks = []
18
19 async for chunk in self.llm_client.generate_stream(
20 context
21 ):
22 if chunk.text:
23 response_chunks.append(chunk.text)
24 yield AgentResponse(content=chunk.text)
25
26 # Cache the complete response
27 self.response_cache[cache_key] = response_chunks

Best Practices

  1. Small Chunks: Send manageable chunk sizes for smooth streaming
  2. Buffer Management: Use appropriate buffering for sentence boundaries
  3. Error Handling: Handle stream cancellation gracefully
  4. Resource Cleanup: Always clean up streaming resources on interruption
  5. Progress Tracking: Monitor streaming progress for debugging
  6. Rate Limiting: Consider rate limits for high-frequency streaming
  7. Memory Management: Clear large responses after streaming completes

Common Use Cases

  • Conversational Agents: Real-time chat responses
  • Content Generation: Long-form content with immediate feedback
  • Live Translation: Streaming translation of ongoing speech
  • Code Generation: Progressive code output with syntax validation
  • Data Analysis: Streaming analysis results as they’re computed
  • Multi-modal Responses: Streaming text while preparing audio/images

Troubleshooting

Choppy Streaming

  • Increase buffer size for smoother delivery
  • Check network latency between LLM API and application
  • Monitor CPU usage during chunk processing

Memory Issues

  • Implement response chunk limits
  • Clear large context periodically
  • Monitor memory usage during long streams

Interruption Problems

  • Ensure proper CancelledError handling in async generators
  • Test interruption at different streaming stages
  • Verify resource cleanup in interrupt handlers

This pattern is essential for creating responsive, natural-feeling voice agents that provide immediate feedback to users while generating comprehensive responses.