Integrating Events with WebSockets in Nexios
Nexios combines WebSockets with a powerful event system to create reactive, real-time applications. This integration lets different parts of your system communicate seamlessly while keeping code clean and maintainable.
Basic WebSocket Event Integration
Emitting Events from Connections
WebSocket handlers can trigger events that other components can react to:
python
@app.ws_route("/chat")
async def chat_handler(ws: WebSocket):
await ws.accept()
await app.events.emit("ws.connected", {"client": ws.client}) # Connection event
try:
while True:
message = await ws.receive_json()
await app.events.emit("chat.message", message) # Message event
except Exception as e:
await app.events.emit("ws.error", {"error": str(e)}) # Error event
Reacting to Events in Handlers
Subscribe to events and push updates to connected clients:
python
@app.events.on("notification.created")
async def push_notification(notification):
await ChannelBox.group_send(
group_name="notifications",
payload=notification
)
Advanced Integration Patterns
Namespaced WebSocket Events
Create isolated event spaces for better organization:
python
ws_events = app.events.namespace("websocket")
@ws_events.on("message.received")
async def process_message(msg):
print(f"New message: {msg['content']}")
# In handler:
await ws_events.emit("message.received", {"content": "Hello"})
One-Time Connection Events
Execute actions only when a connection first starts:
python
@app.events.once("connection.init")
async def send_welcome(data):
await data["ws"].send_json({"type": "welcome"})
# During connection:
await app.events.emit("connection.init", {"ws": ws})
Error Handling with Events
Centralize error management through events:
python
@app.events.on("ws.error")
async def handle_errors(error):
logging.error(f"WebSocket failure: {error}")
# Alert monitoring systems
# In handlers:
try:
...
except Exception as e:
await app.events.emit("ws.error", {"error": str(e)})
Complete Chat Application Example
python
@app.ws_route("/chat/{room}")
async def chat_room(ws: WebSocket):
room = ws.path_params["room"]
channel = Channel(websocket=ws)
await ChannelBox.add_channel_to_group(channel, f"chat_{room}")
try:
while True:
msg = await ws.receive_json()
await app.events.emit("room.message", {
"room": room,
"message": msg
})
finally:
await ChannelBox.remove_channel_from_group(channel, f"chat_{room}")
@app.events.on("room.message")
async def broadcast(msg):
await ChannelBox.group_send(
group_name=f"chat_{msg['room']}",
payload=msg
)