WebSockets
FastAPI Views provides WebSocketAPIView — a class-based view for WebSocket endpoints with built-in Pydantic validation, connection tracking, broadcast helpers, and clean disconnect handling.
WebSocketAPIView
Subclass WebSocketAPIView and implement the handler async method. The view automatically accepts the connection, deserializes incoming binary frames using the message_schema, and closes the connection on disconnect.
from pydantic import BaseModel
from fastapi import FastAPI
from fastapi_views import ViewRouter, configure_app
from fastapi_views.views.websockets import WebSocketAPIView
class ChatMessage(BaseModel):
user: str
text: str
class ChatReply(BaseModel):
text: str
class ChatView(WebSocketAPIView):
name = "chat"
message_schema = ChatMessage
async def handler(self) -> None:
async for message in self.messages:
reply = ChatReply(text=f"[{message.user}] {message.text}")
await self.send(reply)
router = ViewRouter()
router.register_websocket_view(ChatView, prefix="/ws/chat")
app = FastAPI()
app.include_router(router)
configure_app(app)
Type parameters
WebSocketAPIView is generic over three type variables:
| Variable | Description |
|---|---|
P |
ParamSpec for the handler method's extra dependencies |
S |
The send schema — the type passed to send / broadcast |
R |
The receive schema — the type yielded by self.messages |
For most cases you can leave the generics implicit and rely on message_schema.
Sending and broadcasting
| Method | Description |
|---|---|
await self.send(message) |
Serialize and send to the current connection |
await self.broadcast(message) |
Serialize and send to all active connections of this view class |
Both methods use a cached Pydantic TypeAdapter keyed on the return type of get_serializer("send"). Disconnected clients are silently skipped during broadcast.
Receiving messages
self.messages is an AsyncIterable that yields validated, deserialized messages:
async def handler(self) -> None:
async for message in self.messages:
# message is already a validated `message_schema` instance
await self.broadcast(message)
Incoming bytes are validated with message_schema (or the schema returned by get_message_schema("receive")). A ValidationError or WebSocketDisconnect cancels the receive loop and triggers cleanup.
Connection lifecycle hooks
Override on_connect and on_disconnect to run logic when a client connects or disconnects:
class RoomView(WebSocketAPIView):
message_schema = ChatMessage
async def on_connect(self) -> None:
print(f"New connection, total: {len(self._connections)}")
async def on_disconnect(self) -> None:
print("Client left")
async def handler(self) -> None:
async for message in self.messages:
await self.broadcast(message)
on_connect is called before the internal receive loop starts. on_disconnect is called after the connection is removed from _connections and the socket is closed, with a configurable timeout (default 30 s) to allow graceful cleanup.
Per-class connection tracking
_connections is a class-level list of all active WebSocket objects for that view class. Use it to inspect or act on connected clients:
class StatsView(WebSocketAPIView):
...
async def handler(self) -> None:
async for _ in self.messages:
count = len(self._connections)
await self.send(StatusMessage(online=count))
FastAPI dependency injection
Path and query parameters, as well as Depends(...) dependencies, are supported through the handler signature:
from fastapi import Depends
async def get_current_user(token: str) -> str:
return token # simplified
class AuthenticatedView(WebSocketAPIView):
message_schema = ChatMessage
async def handler(self, user: str = Depends(get_current_user)) -> None:
async for message in self.messages:
if message.user == user:
await self.send(ChatReply(text=message.text))
Customising serialization
Override get_message_schema to use different schemas for send and receive, or to vary them based on the action:
class TypedView(WebSocketAPIView):
@classmethod
def get_message_schema(cls, action):
if action == "receive":
return IncomingMessage
return OutgoingMessage
async def handler(self) -> None:
async for msg in self.messages:
await self.send(OutgoingMessage(result=msg.value * 2))
Disconnect timeout
disconnect_timeout (default 30) is the number of seconds the cleanup block in on_disconnect is allowed to run before being cancelled. Increase it if your disconnect logic involves slow I/O:
class SlowCleanupView(WebSocketAPIView):
disconnect_timeout = 60
async def on_disconnect(self) -> None:
await flush_session_to_db(self.websocket)
async def handler(self) -> None:
async for message in self.messages:
await self.send(message)
Connecting from a browser
const ws = new WebSocket("ws://localhost:8000/ws/chat");
ws.onopen = () => {
ws.send(JSON.stringify({ user: "alice", text: "hello" }));
};
ws.onmessage = (event) => {
const reply = JSON.parse(event.data);
console.log(reply.text);
};
ws.onclose = () => console.log("disconnected");
Messages are sent and received as binary frames (UTF-8 encoded JSON bytes).
Complete example
from typing import ClassVar
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from fastapi_views import ViewRouter, configure_app
from fastapi_views.views.websockets import WebSocketAPIView
class ChatMessage(BaseModel):
user: str
text: str
class ChatReply(BaseModel):
text: str
echo: bool = False
class ChatView(WebSocketAPIView[ChatMessage, ChatReply]):
"""Simple echo chat — broadcasts every received message back to all connections."""
name = "chat"
message_schema = ChatMessage
# Per-class state: map of username -> connection count (illustrative)
online: ClassVar[dict[str, int]] = {}
async def on_connect(self) -> None:
self.logger.info("Client connected, total=%d", len(self._connections))
async def on_disconnect(self) -> None:
self.logger.info("Client disconnected, total=%d", len(self._connections))
async def handler(self, api_key: str | None = Depends()) -> None:
if api_key is None:
return
async for message in self.messages:
reply = ChatReply(text=f"[{message.user}] {message.text}", echo=True)
await self.broadcast(reply)
router = ViewRouter()
router.register_websocket_view(ChatView, prefix="/ws/chat")
app = FastAPI(title="WebSocket Chat Example")
app.include_router(router)
configure_app(app)