Skip to content

Workers and Scheduler

Two distinct async-work systems run inside the backend process:

System Purpose Source
Asynq Fire-and-forget tasks enqueued by request handlers (activity logging, future high-volume work) backend/internal/worker/
Distributed Scheduler Periodic cron jobs that must run on exactly one instance (cleanups, status updates, expiry) backend/internal/scheduler/

Both use Redis as their backing store but they are separate engines. Don't confuse them.

Asynq

Asynq is a battle-tested Redis-backed task queue. It runs on every backend instance and consumes tasks from three priority queues.

Server config

// backend/internal/worker/server.go
asynq.NewServer(redisOpt, asynq.Config{
    Concurrency: concurrency,  // from cfg.Worker.Concurrency
    Queues: map[string]int{
        "critical": 6,  // highest weight
        "default":  3,
        "low":      1,
    },
    ErrorHandler: ...,  // logs failed tasks
})

The weights are relative dequeue ratios — Asynq pulls 6:3:1 from critical:default:low. Concurrency is how many tasks the server processes simultaneously across all queues.

Retries, exponential backoff, and the deadletter queue are handled by Asynq itself; failed tasks land in the failed queue with the error captured.

Registering a task

  1. Define the task type and payload in backend/internal/worker/tasks.go:
const TaskLogUserActivity = "activity:log"

type LogActivityPayload struct {
    UserID       uuid.UUID `json:"user_id"`
    ActivityType string    `json:"activity_type"`
    // ...
}
  1. Write the handler in (or alongside) backend/internal/worker/:
func (h *TaskHandler) HandleLogUserActivity(ctx context.Context, t *asynq.Task) error {
    var payload LogActivityPayload
    if err := json.Unmarshal(t.Payload(), &payload); err != nil {
        return fmt.Errorf("...: %w", asynq.SkipRetry)
    }
    return h.activityLogStore.CreateLogEntry(ctx, payload.UserID, ...)
}
  1. Register the handler in TaskHandler.RegisterHandlers:
mux.HandleFunc(TaskLogUserActivity, h.HandleLogUserActivity)
  1. Enqueue from a service or handler via worker.Client.Enqueue(asynq.NewTask(...)).

Return asynq.SkipRetry from your handler to send a task straight to the dead-letter queue — useful for un-recoverable errors like unmarshal failures.

Distributed Scheduler

Asynq is great for "do this once, soon-ish" jobs. It is not the right tool for "run this every 5 minutes across the cluster" — you'd either run it on every instance (duplicate work) or single out one instance (single point of failure). Our scheduler solves that with Redis-based leader election.

Source: backend/internal/scheduler/.

Components

Manager
 ├── Client     // schedules tasks (used internally and by cron handlers)
 ├── Scheduler  // leader-elected, polls due tasks, runs cron loops
 │      ├── runTaskDiscovery   // ZSET -> ready LIST
 │      ├── runCronJobs        // periodic enqueues
 │      └── Reclaimer          // stalled-task recovery
 └── Worker     // consumes the ready LIST, executes handlers (every instance)

Every backend instance runs the Client and Worker. Only the elected leader runs the Scheduler loops. Leader election is a Redis SETNX on scheduler:leader with a 10-second TTL, refreshed every 5 seconds.

Redis keys

Key Type Purpose
tasks:scheduled ZSET Pending tasks, score = epoch timestamp, member = task ID
tasks:payload HASH Task payloads keyed by task ID
tasks:queue:default LIST Tasks ready to execute
tasks:processing LIST In-flight tasks (managed via BRPOPLPUSH)
tasks:active:<id> STRING Heartbeat lock for a running task
scheduler:leader STRING Leader-election lock

Cron tasks

Registered in scheduler.NewManager:

Task Interval What it does
status_update 5m Recomputes event status (upcoming → live → finished)
redis_sync 5m Re-syncs event geo locations into Redis from Postgres
token_cleanup 1h Deletes expired refresh tokens, OTPs, and sessions
account_suspension 12h Suspends accounts flagged for moderation
purge 24h Deletes data marked for purge per retention policy
deactivated_acct_purge 24h Hard-deletes accounts after the deactivation grace period
moment_cleanup 5m Soft-deletes expired non-journaled moments
moment_purge 24h Hard-deletes soft-deleted moments older than 7 days
message_expiry 30s Hard-deletes disappearing messages past their expires_at

The cron functions just enqueue a task; the actual work runs on whichever worker dequeues it next, so the load is distributed across the cluster even though scheduling is centralised on the leader.

Adding a cron job

  1. Define a new TaskType constant in scheduler/types.go.
  2. Implement HandlerService.HandleXxx in scheduler/handlers.go.
  3. Register the handler with the worker and the cron in scheduler/manager.go:
worker.RegisterHandlerFunc(TaskTypeMyJob, handlerService.HandleMyJob)
scheduler.RegisterCron("my_job", 1*time.Hour, func(ctx context.Context) {
    _, _ = client.EnqueueTask(ctx, TaskTypeMyJob, nil)
})

Reliability

  • Atomic scheduling. ScheduleTask uses a Redis pipeline to set the payload (HASH) and add to the schedule (ZSET) atomically.
  • Leader election. Only the leader polls tasks:scheduled, preventing duplicate enqueues.
  • Reliable queueing. Workers use BRPOPLPUSH to move a task to tasks:processing before running it.
  • Reclaimer. The leader periodically checks tasks:processing for stalled tasks (worker crashed mid-execution) and moves them back to the ready queue.

Worker concurrency

Manager.Start() starts the scheduler worker with hard-coded concurrency 5. This is independent of the Asynq Worker.Concurrency setting — the two systems do not share worker pools.

Operating notes

  • The leader is sticky-ish but not pinned. If a leader process dies, another instance acquires scheduler:leader within ~10 seconds (TTL expiry).
  • A cron task that takes longer than its interval will overlap on the worker side. That's by design — workers run independently from scheduling.
  • Asynq has its own web UI (asynqmon); we don't deploy it today but the data structures are compatible.

See also

  • Redis — the underlying store
  • backend/internal/scheduler/scheduler.md — original engineering doc (kept in source for reference)