WebSocket Hub¶
Real-time chat for events runs over WebSockets. The Hub is the per-pod broker
that owns local connections; cross-pod fanout flows through Redis pub/sub on
the chat:event:{eventId} channel. Source:
backend/internal/websocket/hub.go, integrated by
backend/internal/handlers/chat_handler.go.
Architecture¶
Hub
├── rooms: map[EventID] -> Room
│ ├── Clients: set[*Client]
│ └── mutex
├── Register chan *Client
├── Unregister chan *Client
├── Broadcast chan *BroadcastMessage (buffered, 256)
├── rdb redis.UniversalClient (cross-pod pub/sub)
└── podID uuid.UUID (origin tag for dedup)
- A
Hubowns the room registry and aRun()goroutine that serialises register/unregister/broadcast operations. It also starts a long-livedPSUBSCRIBE chat:event:*goroutine whenrdb != nil. - A
Roomis bound 1:1 to an event UUID — every connected client joins exactly one room. - A
Clientwraps a*websocket.Conn, a bufferedSendchannel, and pointers back to the hub and its room.
There is one Hub per pod. It starts inside main.go as
go app.Hub.Run() whenever the process runs in a mode that serves WS
(full, multi-hub, ws-hub). The api-hub and async modes skip Hub
startup entirely.
Process modes and the Hub¶
The single tomoda-backend binary chooses what to start based on the
--mode flag (or SERVER_MODE env). The matrix:
| Mode | Hub starts? | HTTP /api/v1? |
/ws/chat/:eventId? |
Async (worker+scheduler)? |
|---|---|---|---|---|
full |
✓ | ✓ | ✓ | ✓ |
multi-hub |
✓ | ✓ | ✓ | |
async |
✓ | |||
api-hub |
✓ | |||
ws-hub |
✓ | ✓ |
api-hub and ws-hub are reserved for the future api/ws deployment split —
see the ws-split design doc in the devops repo. Local dev defaults to
full. In production today the API deployment runs multi-hub and the
async deployment runs async.
Connection lifecycle¶
sequenceDiagram
participant Browser
participant Gin
participant JWTAuth
participant ChatHandler
participant Hub
participant Room
Browser->>Gin: GET /ws/chat/:eventId<br/>(Sec-WebSocket-Key, Auth header or ?token=)
Gin->>JWTAuth: validate JWT
JWTAuth-->>Gin: userID
Gin->>ChatHandler: HandleWebSocket
ChatHandler->>Hub: Register <- client
Hub->>Room: rooms[eventId] (create if missing)
Hub->>Room: room.Clients[client] = true
par read loop
ChatHandler->>Browser: ReadPump (handles ping/text)
and write loop
ChatHandler->>Browser: WritePump (drains Send, pings every 54s)
end
Browser->>ChatHandler: { type: "send_message", data: ... }
ChatHandler->>Hub: Broadcast <- BroadcastMessage
Hub->>Room: fan out to all clients
Room->>Browser: forward to other participants
Authentication¶
The WebSocket upgrade endpoint sits behind middleware.JWTAuth(authService):
ws.GET("/chat/:eventId", middleware.JWTAuth(app.AuthService),
app.Handlers.ChatHandler.HandleWebSocket)
The token is read from the Authorization header. The :eventId path parameter is bound to the client's EventID field at upgrade time and never changes for the lifetime of the connection — one socket, one room.
Ping / pong¶
Tunables in hub.go:
const (
writeWait = 10 * time.Second
pongWait = 60 * time.Second
pingPeriod = (pongWait * 9) / 10 // 54s
)
- The server sends a WebSocket-level ping every 54 seconds.
- The client must reply with a pong within 60 seconds of the last read, or the connection is closed.
- The client may also send an application-level
{ "type": "ping" }message at any time; the server replies with{ "type": "pong" }(seeClient.handleMessage).
Broadcasting¶
Hub.Broadcast is a buffered channel (capacity 256). ChatHandler writes a
BroadcastMessage{ EventID, Message, Exclude } onto it. For each message the
Hub does two things in order:
- Local fanout — push to every
Client.Sendchannel in the matching room, optionally skipping theExcludeclient (the sender). - Remote publish —
PUBLISH chat:event:{eventID}with a JSON envelope{opid: <podID>, xuid: <Exclude.UserID|null>, p: <payload>}so sibling pods can fan out to their local clients.
The subscriber loop on every pod reads from PSUBSCRIBE chat:event:*. On
each incoming envelope it:
- Drops the message if
opid == h.podID(this pod already delivered locally during the publish). - Otherwise calls a UserID-keyed fanout that skips
xuidif present.
If a client's Send channel is full (default case in the select), the Hub
closes the channel and evicts the client. The corresponding WritePump
goroutine exits, closing the socket. This is the only backpressure mechanism
— it prefers losing slow clients over blocking the broadcast loop.
Reconnect: the subscriber reconnects with a 2-second backoff on transient
Redis errors. Messages published during the gap are lost (Redis pub/sub
gives at-most-once); for chat the message is already persisted via
ChatService.SendMessage, so the client recovers history on reconnect.
Empty rooms¶
unregisterClient deletes the room from Hub.rooms once it's empty so the map doesn't grow unbounded. This is a tiny but important detail — without it, a process that has handled millions of distinct events would leak per-event memory forever.
Message types¶
Incoming JSON is deserialised into models.WSMessage:
type WSMessage struct {
Type MessageType `json:"type"`
Data any `json:"data"`
}
MessageTypePing is handled in the Hub; everything else is forwarded to ChatHandler which decides whether to persist + broadcast (e.g. send_message, react) or drop.
Horizontal scaling¶
The Hub supports replicas > 1 via the Redis pub/sub fanout described above.
Ingress session affinity is not required: a client can connect to any
pod and still receive messages from senders on other pods.
Per-pod presence
Hub.GetOnlineCount(eventID) returns the count of clients on the
calling pod only. For an event-wide presence number across pods, use
the chat:online:* Redis keys that ChatService already maintains —
those are pod-independent.
For a deeper protocol-level treatment of how the frontend uses this, see Architecture → Real-time.