Skip to main content
Streaming responses enable real-time, chunk-by-chunk delivery of agent responses, providing lower latency and more natural conversational flow.

Pattern Overview

This pattern enables:
  • Low Latency: Start speaking as soon as first chunk is available
  • Natural Flow: Stream responses as they’re generated by LLMs
  • Interruption Support: Each chunk can be individually interrupted
  • Progressive Processing: Handle large responses incrementally

Key Components

Events

  • AgentResponse: Individual response chunks with content
  • Chunk Types: Text chunks, audio chunks, or other media types
  • Progressive Delivery: Multiple events for a single logical response

Nodes

  • Async Generators: Use yield to emit response chunks
  • LLM Integration: Stream directly from LLM API responses
  • Chunk Processing: Transform and validate each chunk

Routes

  • stream(): Process async generators that yield multiple values
  • broadcast(): Send each chunk immediately to output
  • Interruption: Handle cancellation between chunks

Basic Streaming Example

from line.nodes import ReasoningNode
from line.events import AgentResponse


class StreamingChatNode(ReasoningNode):
    def __init__(self, system_prompt, llm_client):
        super().__init__(system_prompt=system_prompt)
        self.llm_client = llm_client

    async def process_context(self, context):
        # Format conversation for LLM
        messages = self.format_for_llm(context.events)

        # Stream response chunks
        async for chunk in self.llm_client.generate_stream(messages):
            if chunk.text:
                yield AgentResponse(
                    content=chunk.text, chunk_type="text"
                )


# Route streaming responses
(
    bridge.on(UserStoppedSpeaking)
    .stream(node.generate)  # Processes each yielded chunk
    .broadcast()  # Sends each chunk immediately
)

Advanced Streaming Patterns

Chunked Processing with Validation

class ValidatedStreamingNode(ReasoningNode):
    async def process_context(self, context):
        accumulated_response = ""

        async for chunk in self.llm_client.generate_stream(context):
            # Validate each chunk
            if self.is_valid_chunk(chunk.text):
                accumulated_response += chunk.text

                # Yield validated chunk
                yield AgentResponse(
                    content=chunk.text, chunk_type="text"
                )
            else:
                # Handle invalid chunks
                logger.warning(
                    f"Invalid chunk filtered: {chunk.text}"
                )

        # Optionally yield completion marker
        yield AgentGenerationComplete(
            total_length=len(accumulated_response),
            chunk_count=self.chunk_counter,
        )

    def is_valid_chunk(self, text):
        # Filter inappropriate content, malformed text, etc.
        return (
            text
            and len(text.strip()) > 0
            and not self.contains_inappropriate_content(text)
        )

Multi-Stage Streaming Pipeline

async def preprocess_chunks(message):
    """First stage: clean and format chunks."""
    chunk = message.event

    # Clean up chunk text
    cleaned_text = clean_text(chunk.content)

    # Add formatting
    if needs_punctuation(cleaned_text):
        cleaned_text = add_punctuation(cleaned_text)

    yield AgentResponse(
        content=cleaned_text, chunk_type="preprocessed"
    )


async def postprocess_chunks(message):
    """Second stage: final processing and validation."""
    chunk = message.event

    # Apply final transformations
    final_text = apply_style_guide(chunk.content)

    # Validate before sending to user
    if is_appropriate_for_user(final_text):
        yield AgentResponse(content=final_text, chunk_type="final")


# Multi-stage streaming pipeline
(
    bridge.on(UserStoppedSpeaking)
    .stream(node.generate)  # Generate raw chunks
    .stream(preprocess_chunks)  # Clean chunks
    .stream(postprocess_chunks)  # Final processing
    .broadcast()  # Send to user
)

Buffered Streaming

class BufferedStreamingNode(ReasoningNode):
    def __init__(self, buffer_size=50):
        super().__init__()
        self.buffer_size = buffer_size

    async def process_context(self, context):
        buffer = ""

        async for chunk in self.llm_client.generate_stream(context):
            buffer += chunk.text

            # Send buffer when it reaches target size or at sentence boundaries
            if len(
                buffer
            ) >= self.buffer_size or self.ends_with_sentence(buffer):
                yield AgentResponse(
                    content=buffer, chunk_type="buffered"
                )
                buffer = ""

        # Send any remaining content
        if buffer.strip():
            yield AgentResponse(content=buffer, chunk_type="final")

    def ends_with_sentence(self, text):
        return text.rstrip().endswith((".", "!", "?"))

Streaming with Tool Integration

class ToolAwareStreamingNode(ReasoningNode):
    async def process_context(self, context):
        # Check for tool results first
        tool_results = [
            e for e in context.events if isinstance(e, ToolResult)
        ]

        if tool_results:
            # Stream response incorporating tool results
            async for chunk in self.generate_with_tools(
                context, tool_results
            ):
                yield chunk
        else:
            # Check if we need to call tools
            user_input = context.get_latest_user_transcript_message()

            if self.requires_tool_call(user_input):
                # Call tool first
                tool_call = self.determine_tool_call(user_input)
                yield tool_call

                # Stream initial response while tool executes
                yield AgentResponse(
                    content="Let me check that for you..."
                )
            else:
                # Normal streaming response
                async for chunk in self.generate_normal_response(
                    context
                ):
                    yield chunk

    async def generate_with_tools(self, context, tool_results):
        """Stream response that incorporates tool results."""
        # Format context with tool results
        enhanced_context = self.add_tool_results_to_context(
            context, tool_results
        )

        async for chunk in self.llm_client.generate_stream(
            enhanced_context
        ):
            if chunk.text:
                yield AgentResponse(content=chunk.text)

Interruption-Aware Streaming

class InterruptibleStreamingNode(ReasoningNode):
    def __init__(self):
        super().__init__()
        self.current_stream = None
        self.chunks_sent = 0

    def on_interrupt_generate(self, message):
        """Handle interruption during streaming."""
        logger.info(
            f"Streaming interrupted after {self.chunks_sent} chunks"
        )

        # Clean up streaming resources
        if self.current_stream:
            # Cancel ongoing LLM stream
            self.current_stream.cancel()
            self.current_stream = None

        self.chunks_sent = 0

    async def process_context(self, context):
        try:
            self.chunks_sent = 0
            self.current_stream = self.llm_client.generate_stream(
                context
            )

            async for chunk in self.current_stream:
                if chunk.text:
                    yield AgentResponse(content=chunk.text)
                    self.chunks_sent += 1

        except asyncio.CancelledError:
            logger.info("Stream cancelled due to interruption")
            raise
        finally:
            self.current_stream = None


# Route with interruption support
(
    bridge.on(UserStoppedSpeaking)
    .interrupt_on(
        UserStartedSpeaking, handler=node.on_interrupt_generate
    )
    .stream(node.generate)
    .broadcast()
)

Performance Optimizations

Parallel Streaming

class ParallelStreamingNode(ReasoningNode):
    async def process_context(self, context):
        # Start multiple streams in parallel for faster response
        tasks = [
            self.generate_main_response(context),
            self.generate_context_analysis(context),
            self.generate_suggestions(context),
        ]

        # Stream results as they become available
        async for chunk in self.merge_streams(tasks):
            yield chunk

    async def merge_streams(self, tasks):
        """Merge multiple async generators."""
        active_tasks = [task.__aiter__() for task in tasks]

        while active_tasks:
            # Wait for next chunk from any stream
            done, pending = await asyncio.wait(
                [task.__anext__() for task in active_tasks],
                return_when=asyncio.FIRST_COMPLETED,
            )

            for task in done:
                try:
                    chunk = await task
                    yield chunk
                except StopAsyncIteration:
                    # Stream completed, remove from active tasks
                    active_tasks.remove(task)

Cached Streaming

class CachedStreamingNode(ReasoningNode):
    def __init__(self):
        super().__init__()
        self.response_cache = {}

    async def process_context(self, context):
        # Create cache key from context
        cache_key = self.create_cache_key(context)

        if cache_key in self.response_cache:
            # Stream cached response
            cached_response = self.response_cache[cache_key]
            for chunk in cached_response:
                yield AgentResponse(content=chunk)
        else:
            # Generate and cache new response
            response_chunks = []

            async for chunk in self.llm_client.generate_stream(
                context
            ):
                if chunk.text:
                    response_chunks.append(chunk.text)
                    yield AgentResponse(content=chunk.text)

            # Cache the complete response
            self.response_cache[cache_key] = response_chunks

Best Practices

  1. Small Chunks: Send manageable chunk sizes for smooth streaming
  2. Buffer Management: Use appropriate buffering for sentence boundaries
  3. Error Handling: Handle stream cancellation gracefully
  4. Resource Cleanup: Always clean up streaming resources on interruption
  5. Progress Tracking: Monitor streaming progress for debugging
  6. Rate Limiting: Consider rate limits for high-frequency streaming
  7. Memory Management: Clear large responses after streaming completes

Common Use Cases

  • Conversational Agents: Real-time chat responses
  • Content Generation: Long-form content with immediate feedback
  • Live Translation: Streaming translation of ongoing speech
  • Code Generation: Progressive code output with syntax validation
  • Data Analysis: Streaming analysis results as they’re computed
  • Multi-modal Responses: Streaming text while preparing audio/images

Troubleshooting

Choppy Streaming

  • Increase buffer size for smoother delivery
  • Check network latency between LLM API and application
  • Monitor CPU usage during chunk processing

Memory Issues

  • Implement response chunk limits
  • Clear large context periodically
  • Monitor memory usage during long streams

Interruption Problems

  • Ensure proper CancelledError handling in async generators
  • Test interruption at different streaming stages
  • Verify resource cleanup in interrupt handlers
This pattern is essential for creating responsive, natural-feeling voice agents that provide immediate feedback to users while generating comprehensive responses.