In today’s fast-paced digital landscape, real-time features have become essential for modern web applications. Users expect instant updates, live notifications, and seamless interaction without page reloads. This is where WebSockets come into play, providing a full-duplex, bidirectional communication channel between clients and servers.

FastAPI, a modern, high-performance web framework for building APIs with Python, offers excellent support for WebSockets.

In this article, we’ll dive deep into how to leverage FastAPI and WebSockets to create robust real-time features and notifications in your applications.

Understanding WebSockets

Before we dive into the implementation details, let’s briefly recap what WebSockets are and why they’re crucial for real-time communication.

WebSockets are a protocol that enables two-way communication between a client (typically a web browser) and a server over a single TCP connection. Unlike traditional HTTP requests, which follow a request-response model, WebSockets allow for continuous, real-time data exchange without the need for polling or long-polling techniques.

Understanding websockets

WebSockets explained

Key benefits of WebSockets include:

  1. Low latency: Messages are sent and received instantly.
  2. Reduced server load: No need for constant polling.
  3. Bidirectional communication: Both client and server can initiate data transfer.
  4. Efficient for real-time applications: Ideal for chat apps, live updates, and notifications.
💡
You can also check the article for Django WebSockets with Channels

Setting up FastAPI for WebSockets

Let’s start by setting up a basic FastAPI application with WebSocket support. First, ensure you have FastAPI and its dependencies installed:


pip install fastapi[all]

Now, let’s create a simple FastAPI application with a WebSocket endpoint:


from fastapi import FastAPI, WebSocket
from fastapi.websockets import WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Message received: {data}")
    except WebSocketDisconnect:
        print("Client disconnected")

This basic setup creates a WebSocket endpoint at /ws. When a client connects, the server accepts the connection and enters a loop, receiving messages and echoing them back to the client.

Managing WebSocket connections

In a real-world application, you’ll likely need to manage multiple WebSocket connections. Let’s create a connection manager to handle this:


from fastapi import FastAPI, WebSocket
from typing import List

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

app = FastAPI()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(f"Client {client_id}: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"Client {client_id} left the chat")

This implementation allows multiple clients to connect and broadcasts messages to all connected clients.

Implementing real-time notifications

Now that we have a basic WebSocket setup, let’s implement a real-time notification system. We’ll create a simple task management system where users can create tasks, and other users will receive real-time notifications when new tasks are added.

First, let’s define our data models:


from pydantic import BaseModel
from typing import List, Dict
from datetime import datetime

class Task(BaseModel):
    id: int
    title: str
    description: str
    created_at: datetime

class NotificationManager:
    def __init__(self):
        self.subscriptions: Dict[str, List[WebSocket]] = {}

    async def subscribe(self, topic: str, websocket: WebSocket):
        if topic not in self.subscriptions:
            self.subscriptions[topic] = []
        self.subscriptions[topic].append(websocket)

    def unsubscribe(self, topic: str, websocket: WebSocket):
        if topic in self.subscriptions:
            self.subscriptions[topic].remove(websocket)

    async def notify(self, topic: str, message: str):
        if topic in self.subscriptions:
            for websocket in self.subscriptions[topic]:
                await websocket.send_text(message)

notification_manager = NotificationManager()

Now, let’s update our FastAPI application to include task creation and real-time notifications:


from fastapi import FastAPI, WebSocket, HTTPException
from typing import List, Dict
from datetime import datetime

app = FastAPI()

tasks: List[Task] = []

@app.post("/tasks")
async def create_task(task: Task):
    task.created_at = datetime.now()
    tasks.append(task)
    await notification_manager.notify("tasks", f"New task created: {task.title}")
    return {"message": "Task created successfully"}

@app.get("/tasks")
async def get_tasks():
    return tasks

@app.websocket("/ws/notifications/{topic}")
async def websocket_endpoint(websocket: WebSocket, topic: str):
    await websocket.accept()
    await notification_manager.subscribe(topic, websocket)
    try:
        while True:
            await websocket.receive_text()
    except WebSocketDisconnect:
        notification_manager.unsubscribe(topic, websocket)

In this implementation, clients can subscribe to notifications for specific topics (in this case, “tasks”). When a new task is created, all subscribed clients receive a real-time notification.

Handling Authentication and Authorization

In a production environment, you’ll want to secure your WebSocket connections. FastAPI makes it easy to integrate authentication and authorization with WebSockets. Here’s an example using JWT tokens:


from fastapi import FastAPI, WebSocket, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import BaseModel

# ... (previous imports and code)

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

class User(BaseModel):
    username: str

def get_current_user(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise HTTPException(status_code=401, detail="Invalid authentication credentials")
        return User(username=username)
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid authentication credentials")

@app.websocket("/ws/secure/{topic}")
async def websocket_endpoint(
    websocket: WebSocket,
    topic: str,
    token: str = Depends(oauth2_scheme)
):
    user = get_current_user(token)
    await websocket.accept()
    await notification_manager.subscribe(topic, websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await notification_manager.notify(topic, f"{user.username}: {data}")
    except WebSocketDisconnect:
        notification_manager.unsubscribe(topic, websocket)

This setup ensures that only authenticated users can connect to the WebSocket endpoint and receive notifications.

Scaling WebSocket applications

As your application grows, you may need to scale your WebSocket implementation. Here are some strategies to consider:

  1. Redis Pub/Sub: Use Redis as a message broker to handle WebSocket communications across multiple server instances.
  2. Load Balancing: Implement sticky sessions to ensure clients maintain connections to the same server.
  3. Horizontal Scaling: Use a service like AWS ElastiCache or Azure Cache for Redis to distribute WebSocket connections across multiple nodes.

Here’s a basic example of using Redis for pub/sub with FastAPI and WebSockets:


import aioredis
from fastapi import FastAPI, WebSocket

app = FastAPI()

async def get_redis():
    redis = await aioredis.create_redis_pool("redis://localhost")
    return redis

@app.websocket("/ws/redis/{channel}")
async def websocket_endpoint(websocket: WebSocket, channel: str):
    await websocket.accept()
    redis = await get_redis()
    try:
        channel = await redis.subscribe(channel)
        async for message in channel[0].iter():
            await websocket.send_text(message.decode())
    finally:
        await redis.unsubscribe(channel)
        redis.close()
        await redis.wait_closed()

@app.post("/publish/{channel}")
async def publish(channel: str, message: str):
    redis = await get_redis()
    await redis.publish(channel, message)
    return {"message": "Published"}

This implementation allows you to scale your WebSocket application across multiple instances while maintaining real-time communication.

Performance considerations

When working with WebSockets in FastAPI, keep these performance tips in mind:

  1. Connection Limits: Set appropriate limits on the number of concurrent WebSocket connections your server can handle.
  2. Message Size: Implement message size limits to prevent potential DoS attacks.
  3. Heartbeats: Implement periodic heartbeats to keep connections alive and detect disconnects early.
  4. Compression: Use WebSocket compression to reduce bandwidth usage for large payloads.

Here’s an example of implementing heartbeats and connection limits:


from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio

app = FastAPI()

MAX_CONNECTIONS = 1000
HEARTBEAT_INTERVAL = 30  # seconds

class ConnectionManager:
    def __init__(self):
        self.active_connections = set()

    async def connect(self, websocket: WebSocket):
        if len(self.active_connections) >= MAX_CONNECTIONS:
            await websocket.close(code=1008)  # Connection limit exceeded
            return
        await websocket.accept()
        self.active_connections.add(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_heartbeat(self, websocket: WebSocket):
        while True:
            try:
                await asyncio.sleep(HEARTBEAT_INTERVAL)
                await websocket.send_text("heartbeat")
            except WebSocketDisconnect:
                break

manager = ConnectionManager()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    heartbeat_task = asyncio.create_task(manager.send_heartbeat(websocket))
    try:
        while True:
            data = await websocket.receive_text()
            # Process received data
    except WebSocketDisconnect:
        manager.disconnect(websocket)
    finally:
        heartbeat_task.cancel()

This implementation includes connection limits and heartbeats to maintain stable and efficient WebSocket connections.

FastAPI books for building AI application

Conclusion

FastAPI’s support for WebSockets provides a powerful foundation for building real-time features and notifications in your web applications. By using WebSockets, you can create responsive, efficient, and scalable systems that meet the demands of modern users.

In this article, we’ve covered:

  1. Setting up FastAPI for WebSockets
  2. Managing multiple WebSocket connections
  3. Implementing real-time notifications
  4. Handling authentication and authorization
  5. Scaling WebSocket applications
  6. Performance considerations

As you continue to build and refine your real-time applications, remember to consider factors such as security, scalability, and performance. With FastAPI and WebSockets, you have the tools to create robust, high-performance real-time features that will delight your users and set your applications apart.

Last Update: 08/09/2024