Skip to content

WebSocket Hub

Real-time chat for events runs over WebSockets. The Hub is the per-pod broker that owns local connections; cross-pod fanout flows through Redis pub/sub on the chat:event:{eventId} channel. Source: backend/internal/websocket/hub.go, integrated by backend/internal/handlers/chat_handler.go.

Architecture

Hub
 ├── rooms: map[EventID] -> Room
 │                            ├── Clients: set[*Client]
 │                            └── mutex
 ├── Register     chan *Client
 ├── Unregister   chan *Client
 ├── Broadcast    chan *BroadcastMessage   (buffered, 256)
 ├── rdb          redis.UniversalClient   (cross-pod pub/sub)
 └── podID        uuid.UUID               (origin tag for dedup)
  • A Hub owns the room registry and a Run() goroutine that serialises register/unregister/broadcast operations. It also starts a long-lived PSUBSCRIBE chat:event:* goroutine when rdb != nil.
  • A Room is bound 1:1 to an event UUID — every connected client joins exactly one room.
  • A Client wraps a *websocket.Conn, a buffered Send channel, and pointers back to the hub and its room.

There is one Hub per pod. It starts inside main.go as go app.Hub.Run() whenever the process runs in a mode that serves WS (full, multi-hub, ws-hub). The api-hub and async modes skip Hub startup entirely.

Process modes and the Hub

The single tomoda-backend binary chooses what to start based on the --mode flag (or SERVER_MODE env). The matrix:

Mode Hub starts? HTTP /api/v1? /ws/chat/:eventId? Async (worker+scheduler)?
full
multi-hub
async
api-hub
ws-hub

api-hub and ws-hub are reserved for the future api/ws deployment split — see the ws-split design doc in the devops repo. Local dev defaults to full. In production today the API deployment runs multi-hub and the async deployment runs async.

Connection lifecycle

sequenceDiagram
    participant Browser
    participant Gin
    participant JWTAuth
    participant ChatHandler
    participant Hub
    participant Room

    Browser->>Gin: GET /ws/chat/:eventId<br/>(Sec-WebSocket-Key, Auth header or ?token=)
    Gin->>JWTAuth: validate JWT
    JWTAuth-->>Gin: userID
    Gin->>ChatHandler: HandleWebSocket
    ChatHandler->>Hub: Register <- client
    Hub->>Room: rooms[eventId] (create if missing)
    Hub->>Room: room.Clients[client] = true
    par read loop
        ChatHandler->>Browser: ReadPump (handles ping/text)
    and write loop
        ChatHandler->>Browser: WritePump (drains Send, pings every 54s)
    end
    Browser->>ChatHandler: { type: "send_message", data: ... }
    ChatHandler->>Hub: Broadcast <- BroadcastMessage
    Hub->>Room: fan out to all clients
    Room->>Browser: forward to other participants

Authentication

The WebSocket upgrade endpoint sits behind middleware.JWTAuth(authService):

ws.GET("/chat/:eventId", middleware.JWTAuth(app.AuthService),
       app.Handlers.ChatHandler.HandleWebSocket)

The token is read from the Authorization header. The :eventId path parameter is bound to the client's EventID field at upgrade time and never changes for the lifetime of the connection — one socket, one room.

Ping / pong

Tunables in hub.go:

const (
    writeWait  = 10 * time.Second
    pongWait   = 60 * time.Second
    pingPeriod = (pongWait * 9) / 10  // 54s
)
  • The server sends a WebSocket-level ping every 54 seconds.
  • The client must reply with a pong within 60 seconds of the last read, or the connection is closed.
  • The client may also send an application-level { "type": "ping" } message at any time; the server replies with { "type": "pong" } (see Client.handleMessage).

Broadcasting

Hub.Broadcast is a buffered channel (capacity 256). ChatHandler writes a BroadcastMessage{ EventID, Message, Exclude } onto it. For each message the Hub does two things in order:

  1. Local fanout — push to every Client.Send channel in the matching room, optionally skipping the Exclude client (the sender).
  2. Remote publishPUBLISH chat:event:{eventID} with a JSON envelope {opid: <podID>, xuid: <Exclude.UserID|null>, p: <payload>} so sibling pods can fan out to their local clients.

The subscriber loop on every pod reads from PSUBSCRIBE chat:event:*. On each incoming envelope it:

  • Drops the message if opid == h.podID (this pod already delivered locally during the publish).
  • Otherwise calls a UserID-keyed fanout that skips xuid if present.

If a client's Send channel is full (default case in the select), the Hub closes the channel and evicts the client. The corresponding WritePump goroutine exits, closing the socket. This is the only backpressure mechanism — it prefers losing slow clients over blocking the broadcast loop.

Reconnect: the subscriber reconnects with a 2-second backoff on transient Redis errors. Messages published during the gap are lost (Redis pub/sub gives at-most-once); for chat the message is already persisted via ChatService.SendMessage, so the client recovers history on reconnect.

Empty rooms

unregisterClient deletes the room from Hub.rooms once it's empty so the map doesn't grow unbounded. This is a tiny but important detail — without it, a process that has handled millions of distinct events would leak per-event memory forever.

Message types

Incoming JSON is deserialised into models.WSMessage:

type WSMessage struct {
    Type MessageType `json:"type"`
    Data any         `json:"data"`
}

MessageTypePing is handled in the Hub; everything else is forwarded to ChatHandler which decides whether to persist + broadcast (e.g. send_message, react) or drop.

Horizontal scaling

The Hub supports replicas > 1 via the Redis pub/sub fanout described above. Ingress session affinity is not required: a client can connect to any pod and still receive messages from senders on other pods.

Per-pod presence

Hub.GetOnlineCount(eventID) returns the count of clients on the calling pod only. For an event-wide presence number across pods, use the chat:online:* Redis keys that ChatService already maintains — those are pod-independent.

For a deeper protocol-level treatment of how the frontend uses this, see Architecture → Real-time.