Streaming Responses in Nexios
Nexios provides powerful streaming capabilities for handling large datasets, real-time data, and long-polling scenarios. This guide covers how to implement and work with
Streaming responses allow you to send data to the client as it becomes available, rather than buffering everything in memory first. This is particularly useful for:
- Large file downloads
- Real-time data feeds
- Long-polling implementations
- Server-Sent Events (SSE)
- Progress reporting
Basic Streaming
Understanding Async Generators for Streaming
Before diving into the code, it's important to understand how async generators work in Python. An async generator is a special type of function that yields values asynchronously, allowing you to process and send data in chunks rather than all at once. This is particularly useful for streaming scenarios where you want to send data as it becomes available.
Key Characteristics of Async Generators:
- Defined using
async def
and contains at least oneyield
statement - Returns an async generator object when called
- Can use
await
to wait for other coroutines - Maintains its state between yields
- Must be consumed using
async for
oranext()
Basic Streaming Example
Here's how to create a simple streaming endpoint using an async generator. This example demonstrates the basic pattern you'll use for most streaming responses:
from nexios import NexiosApp
app = NexiosApp()
@app.get("/stream")
async def stream_data(request, response):
async def generate():
for i in range(5):
yield f"Data chunk {i}\n"
await asyncio.sleep(1) # Simulate work
return response.stream(generate(), content_type="text/plain")
Understanding Chunked Transfer Encoding
Chunked transfer encoding is a streaming data transfer mechanism available in HTTP/1.1. It allows a server to start sending the response before knowing its total size, which is perfect for streaming scenarios where the total size might not be known in advance.
How Chunked Encoding Works:
- The server breaks the response into a series of chunks
- Each chunk is preceded by its size in hexadecimal
- A zero-length chunk marks the end of the response
- The client receives and processes each chunk as it arrives
Implementing Chunked Responses
In Nexios, you don't need to manually implement chunking - it's handled automatically when you use the stream()
method. Here's how to create a chunked response:
@app.get("/chunked")
async def chunked_response(request, response):
async def generate():
yield "First chunk\n"
await asyncio.sleep(1)
yield "Second chunk\n"
await asyncio.sleep(1)
yield "Final chunk\n"
return response.stream(
generate(),
content_type="text/plain",
headers={"X-Streaming": "enabled"}
)
Understanding Server-Sent Events (SSE)
Server-Sent Events (SSE) is a standard that allows a web server to push real-time updates to the client over HTTP. Unlike WebSockets, SSE is a one-way communication channel from server to client, making it simpler and more efficient for certain use cases.
Key Features of SSE:
- Simple text-based protocol
- Automatic reconnection
- Built-in event IDs for tracking
- Browser EventSource API for easy client-side handling
- Works over standard HTTP/HTTPS
SSE Message Format:
Each message consists of one or more lines of text in the format:
event: <event_name>
data: <message>
id: <message_id>
retry: <milliseconds>
Implementing SSE in Nexios
Here's how to implement an SSE endpoint. The key is to use the text/event-stream
content type and follow the SSE message format:
@app.get("/events")
async def sse_events(request, response):
async def event_stream():
try:
while True:
data = await get_latest_event() # Your event source
yield f"data: {data}\n\n"
# Keep the connection alive
await asyncio.sleep(1)
yield ":keepalive\n\n"
except asyncio.CancelledError:
# Handle client disconnection
pass
return response.stream(
event_stream(),
content_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
Understanding File Streaming
Streaming files is essential when dealing with large files that shouldn't be loaded entirely into memory. This approach is memory-efficient and provides a better user experience as the client can start processing the file before the entire download is complete.
Benefits of File Streaming:
- Memory efficiency: Only a small portion of the file is in memory at any time
- Faster time-to-first-byte: Clients can start processing data immediately
- Better handling of large files: No memory limits based on file size
- Support for resumable downloads
How File Streaming Works:
- File is opened in binary read mode
- Data is read in fixed-size chunks (e.g., 8KB)
- Each chunk is yielded to the client
- Process continues until the entire file is sent
Implementing File Downloads with Streaming
Here's how to implement a file download endpoint that streams the file efficiently:
import aiofiles
from pathlib import Path
@app.get("/download/{filename}")
async def download_file(request, response, filename: str):
file_path = Path("uploads") / filename
if not file_path.is_file():
return response.json(
{"error": "File not found"},
status_code=404
)
async def file_sender():
async with aiofiles.open(file_path, "rb") as f:
while chunk := await f.read(8192): # 8KB chunks
yield chunk
return response.stream(
file_sender(),
content_type="application/octet-stream",
headers={
"Content-Disposition": f"attachment; filename={filename}",
"Content-Length": str(file_path.stat().st_size)
}
)
Performance Considerations
- Chunk Size: Choose an appropriate chunk size (typically 4KB-16KB) for your use case.
- Backpressure: Be mindful of slow clients and implement backpressure handling.
- Timeouts: Set appropriate timeouts for long-running streams.
- Connection Pooling: Reuse connections for better performance.
- Compression: Consider using compression for text-based streams.
Understanding Error Handling in Streams
Error handling in streaming responses requires special consideration because the response is sent incrementally. Unlike regular HTTP responses where you can return an error status code at the beginning, with streaming, you need to handle errors that might occur mid-stream.
Key Considerations for Error Handling:
- Immediate vs. Graceful Errors: Some errors (like invalid authentication) should fail immediately, while others (like data processing errors) might allow for graceful degradation.
- Client-Side Handling: Clients must be prepared to handle partial responses and error conditions that occur mid-stream.
- Resource Cleanup: Ensure all resources (files, database connections) are properly cleaned up if an error occurs.
- Error Signaling: Decide how to signal errors to the client (e.g., special error messages, HTTP trailers).
Implementing Error Handling
Here's how to implement robust error handling in a streaming endpoint:
@app.get("/safe-stream")
async def safe_stream(request, response):
async def generate():
try:
for i in range(10):
if i == 5:
raise ValueError("Simulated error")
yield f"Data {i}\n"
await asyncio.sleep(0.5)
except Exception as e:
# Log the full error for debugging
app.logger.error(f"Error in safe_stream: {str(e)}")
# Send a user-friendly error message to the client
yield f"Error: {str(e)}\n"
finally:
# Always send completion message
yield "Stream completed\n"
# Any cleanup code would go here
return response.stream(generate())
Building a Real-time Data Pipeline
A real-time data pipeline processes and delivers data as it's generated, making it ideal for analytics, monitoring, and live dashboards. This example demonstrates how to build such a pipeline using Nexios streaming capabilities.
Pipeline Architecture
- Data Ingestion: Continuously fetch data from a source
- Processing: Transform or analyze the data
- Delivery: Stream processed data to clients in real-time
- Monitoring: Track pipeline health and performance
Implementation Details
This example shows a complete data pipeline that:
- Streams data in real-time
- Handles errors gracefully
- Provides status updates
- Uses newline-delimited JSON for easy parsing
import asyncio
import json
from datetime import datetime
@app.get("/data-pipeline")
async def data_pipeline(request, response):
"""
Real-time data processing pipeline endpoint.
Streams processed data as it becomes available.
"""
async def process_data():
try:
# Initial status update
yield json.dumps({
"status": "starting",
"timestamp": str(datetime.utcnow())
}) + "\n"
# Process data in chunks
async for chunk in fetch_data_chunks():
try:
# Process each chunk asynchronously
processed = await process_chunk(chunk)
# Yield the processed data
yield json.dumps({
"status": "processing",
"data": processed,
"timestamp": str(datetime.utcnow())
}) + "\n"
# Simulate work (remove in production)
await asyncio.sleep(0.1)
except Exception as chunk_error:
# Log chunk processing error but continue with next chunk
app.logger.error(f"Error processing chunk: {chunk_error}")
yield json.dumps({
"status": "chunk_error",
"error": str(chunk_error),
"timestamp": str(datetime.utcnow())
}) + "\n"
except Exception as e:
# Handle fatal errors
app.logger.error(f"Pipeline error: {e}")
yield json.dumps({
"status": "error",
"message": str(e),
"timestamp": str(datetime.utcnow())
}) + "\n"
finally:
# Always send completion message
yield json.dumps({
"status": "complete",
"timestamp": str(datetime.utcnow())
}) + "\n"
# Return the streaming response with appropriate content type
return response.stream(
process_data(),
content_type="application/x-ndjson" # Newline-delimited JSON
)
Client-Side Processing
Clients can consume this stream using libraries like aiohttp
or the browser's EventSource
API. The newline-delimited JSON format makes it easy to parse each message individually as it arrives.