Workers and Scheduler¶
Two distinct async-work systems run inside the backend process:
| System | Purpose | Source |
|---|---|---|
| Asynq | Fire-and-forget tasks enqueued by request handlers (activity logging, future high-volume work) | backend/internal/worker/ |
| Distributed Scheduler | Periodic cron jobs that must run on exactly one instance (cleanups, status updates, expiry) | backend/internal/scheduler/ |
Both use Redis as their backing store but they are separate engines. Don't confuse them.
Asynq¶
Asynq is a battle-tested Redis-backed task queue. It runs on every backend instance and consumes tasks from three priority queues.
Server config¶
// backend/internal/worker/server.go
asynq.NewServer(redisOpt, asynq.Config{
Concurrency: concurrency, // from cfg.Worker.Concurrency
Queues: map[string]int{
"critical": 6, // highest weight
"default": 3,
"low": 1,
},
ErrorHandler: ..., // logs failed tasks
})
The weights are relative dequeue ratios — Asynq pulls 6:3:1 from critical:default:low. Concurrency is how many tasks the server processes simultaneously across all queues.
Retries, exponential backoff, and the deadletter queue are handled by Asynq itself; failed tasks land in the failed queue with the error captured.
Registering a task¶
- Define the task type and payload in
backend/internal/worker/tasks.go:
const TaskLogUserActivity = "activity:log"
type LogActivityPayload struct {
UserID uuid.UUID `json:"user_id"`
ActivityType string `json:"activity_type"`
// ...
}
- Write the handler in (or alongside)
backend/internal/worker/:
func (h *TaskHandler) HandleLogUserActivity(ctx context.Context, t *asynq.Task) error {
var payload LogActivityPayload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("...: %w", asynq.SkipRetry)
}
return h.activityLogStore.CreateLogEntry(ctx, payload.UserID, ...)
}
- Register the handler in
TaskHandler.RegisterHandlers:
mux.HandleFunc(TaskLogUserActivity, h.HandleLogUserActivity)
- Enqueue from a service or handler via
worker.Client.Enqueue(asynq.NewTask(...)).
Return asynq.SkipRetry from your handler to send a task straight to the dead-letter queue — useful for un-recoverable errors like unmarshal failures.
Distributed Scheduler¶
Asynq is great for "do this once, soon-ish" jobs. It is not the right tool for "run this every 5 minutes across the cluster" — you'd either run it on every instance (duplicate work) or single out one instance (single point of failure). Our scheduler solves that with Redis-based leader election.
Source: backend/internal/scheduler/.
Components¶
Manager
├── Client // schedules tasks (used internally and by cron handlers)
├── Scheduler // leader-elected, polls due tasks, runs cron loops
│ ├── runTaskDiscovery // ZSET -> ready LIST
│ ├── runCronJobs // periodic enqueues
│ └── Reclaimer // stalled-task recovery
└── Worker // consumes the ready LIST, executes handlers (every instance)
Every backend instance runs the Client and Worker. Only the elected leader runs the Scheduler loops. Leader election is a Redis SETNX on scheduler:leader with a 10-second TTL, refreshed every 5 seconds.
Redis keys¶
| Key | Type | Purpose |
|---|---|---|
tasks:scheduled |
ZSET | Pending tasks, score = epoch timestamp, member = task ID |
tasks:payload |
HASH | Task payloads keyed by task ID |
tasks:queue:default |
LIST | Tasks ready to execute |
tasks:processing |
LIST | In-flight tasks (managed via BRPOPLPUSH) |
tasks:active:<id> |
STRING | Heartbeat lock for a running task |
scheduler:leader |
STRING | Leader-election lock |
Cron tasks¶
Registered in scheduler.NewManager:
| Task | Interval | What it does |
|---|---|---|
status_update |
5m | Recomputes event status (upcoming → live → finished) |
redis_sync |
5m | Re-syncs event geo locations into Redis from Postgres |
token_cleanup |
1h | Deletes expired refresh tokens, OTPs, and sessions |
account_suspension |
12h | Suspends accounts flagged for moderation |
purge |
24h | Deletes data marked for purge per retention policy |
deactivated_acct_purge |
24h | Hard-deletes accounts after the deactivation grace period |
moment_cleanup |
5m | Soft-deletes expired non-journaled moments |
moment_purge |
24h | Hard-deletes soft-deleted moments older than 7 days |
message_expiry |
30s | Hard-deletes disappearing messages past their expires_at |
The cron functions just enqueue a task; the actual work runs on whichever worker dequeues it next, so the load is distributed across the cluster even though scheduling is centralised on the leader.
Adding a cron job¶
- Define a new
TaskTypeconstant inscheduler/types.go. - Implement
HandlerService.HandleXxxinscheduler/handlers.go. - Register the handler with the worker and the cron in
scheduler/manager.go:
worker.RegisterHandlerFunc(TaskTypeMyJob, handlerService.HandleMyJob)
scheduler.RegisterCron("my_job", 1*time.Hour, func(ctx context.Context) {
_, _ = client.EnqueueTask(ctx, TaskTypeMyJob, nil)
})
Reliability¶
- Atomic scheduling.
ScheduleTaskuses a Redis pipeline to set the payload (HASH) and add to the schedule (ZSET) atomically. - Leader election. Only the leader polls
tasks:scheduled, preventing duplicate enqueues. - Reliable queueing. Workers use
BRPOPLPUSHto move a task totasks:processingbefore running it. - Reclaimer. The leader periodically checks
tasks:processingfor stalled tasks (worker crashed mid-execution) and moves them back to the ready queue.
Worker concurrency¶
Manager.Start() starts the scheduler worker with hard-coded concurrency 5. This is independent of the Asynq Worker.Concurrency setting — the two systems do not share worker pools.
Operating notes¶
- The leader is sticky-ish but not pinned. If a leader process dies, another instance acquires
scheduler:leaderwithin ~10 seconds (TTL expiry). - A cron task that takes longer than its interval will overlap on the worker side. That's by design — workers run independently from scheduling.
- Asynq has its own web UI (
asynqmon); we don't deploy it today but the data structures are compatible.
See also¶
- Redis — the underlying store
backend/internal/scheduler/scheduler.md— original engineering doc (kept in source for reference)