Skip to content

BullMQ Integration Playbook

1. Overview

BullMQ is the async job runtime inside brain. Producers (HTTP/gRPC controllers and services) push work onto Redis-backed queues; processors run inside the same NestJS process and consume jobs concurrently. There are no standalone worker pods today — every brain replica is also a worker. The wiring uses @nestjs/bullmq and bullmq (see apps/brain/package.json).

Redis is the only state for queues. Brain’s owned data lives in Postgres (fennec schema) — the queue is operational glue, not a system of record. When jobs need durability beyond Redis (e.g. shop-VI imports), the long-lived state is mirrored in Prisma rows and the BullMQ job becomes a transient “go do this” trigger.

2. Architecture

flowchart LR
subgraph brisket
UI[brisket / fennec UI]
end
subgraph sirloin
SIR[sirloin gRPC]
end
subgraph brain[brain (NestJS)]
PROD[Producer service<br/>InjectQueue]
PROC[Processor<br/>@Processor + ContextualProcessor]
SCHED[QueueCleanupService<br/>@Cron midnight]
end
REDIS[(Redis<br/>REDIS_HOST:REDIS_PORT)]
PG[(Postgres<br/>fennec)]
ROUND[round gRPC]
FAL[FAL / RunPod / WaveSpeed / Kling]
BB[Bull-Board UI<br/>GET /queues]
UI -->|REST/tRPC| SIR
SIR -->|gRPC| PROD
PROD -->|queue.add jobName, payload, opts| REDIS
REDIS -->|fetched by worker| PROC
PROC -->|update state| PG
PROC -->|inference| ROUND
PROC -->|provider calls| FAL
SCHED -->|clean completed/failed| REDIS
REDIS --- BB

Producers and processors live in the same brain process. Wiring happens in module imports via BullModule.forRootAsync (root config, apps/brain/src/modules/domain/generation/generation.module.ts:67) plus per-queue BullModule.registerQueue / registerQueueAsync calls. Concurrency is set on the @Processor decorator.

The queue name table:

Queue (string id)Constant
mediaflowsQUEUES.MEDIA_FLOWS
exampleexplicitnessQUEUES.EXAMPLE_EXPLICITNESS
shopviimportQUEUES.SHOP_VI_IMPORT
kitsunesfwimagefromnsfwgenerationQUEUES.KITSUNE_SFW_IMAGE_FROM_NSFW_GENERATION

Source: apps/brain/src/common/constants.ts.

3. Queues + jobs

QueueProducer code pathProcessor code pathJob name(s)PayloadConcurrencyRetry policy
mediaflowsapps/brain/src/modules/domain/generation/services/media-scheduling.service.ts (processMediaGeneration); also enqueued via media.service.ts and queue-stats.service.ts lookupsapps/brain/src/modules/domain/generation/processors/media-flows.processor.ts (MediaFlowsProcessor)media-flow-${mediaId}IGenerationJob = { generationRequest: IQueueGeneration, jobId: string } (extends QueueJobData with correlationId, currentUser, traceparent, tracestate). Defined in apps/brain/src/modules/domain/generation/interfaces/generation.interface.ts:114500 (very high — see Failure modes)Global defaults: removeOnComplete after 24h / 1000 jobs, removeOnFail after 7d / 5000 jobs. No queue-level attempts override → defaults to 1 (no retry on transient errors). TODO(@pawel) — confirm intentional.
exampleexplicitnessapps/brain/src/modules/domain/generation/services/tag.service.ts (recalculate-for-tag)apps/brain/src/modules/domain/generation/processors/example-explicitness.processor.ts (ExampleExplicitnessProcessor)recalculate-for-tagExampleExplicitnessJobData = { tagId, oldLevel, newLevel } extends QueueJobData1 (sequential — batch size 500 rows per job)Global defaults only; no override
shopviimportapps/brain/src/modules/domain/shop-vi/services/shop-vi-import.service.tsapps/brain/src/modules/domain/shop-vi/processors/shop-vi-import.processor.ts (ShopVIImportProcessor)import, webhook-importShopVIImportJobPayload = { jobId: number, directoryPath: string, characterPaths?: string[] }1Global defaults only
kitsunesfwimagefromnsfwgenerationapps/brain/src/modules/domain/character/services/character.service.ts (generate)apps/brain/src/modules/domain/character/processors/kitsune-sfw-image-from-nsfw-generation.processor.tsgenerateKitsuneSfwImageFromNsfwGenerationJobPayload = { characterId: string, nsfwSourceTypes?: KitsuneOnboardingImageType[] }20Per-queue override: attempts: 5, backoff: { type: 'exponential', delay: 10_000 } (apps/brain/src/modules/domain/character/kitsune-sfw-image-from-nsfw-generation-queue.constants.ts)

Global default options (applied to every queue unless overridden) are in apps/brain/src/config/queue.config.ts.

4. Idempotency strategy

Mixed.

  • mediaflows uses an explicit jobId derived from the persisted genId at enqueue time — await this.contextualMediaFlowQueue.add(...,..., { jobId }) (media-scheduling.service.ts). BullMQ deduplicates jobs sharing a jobId while the prior copy is still present, so a duplicate enqueue with the same generation row is a no-op. The Postgres Generation row is created before the job is added, so the row is the durable identity.
  • shopviimport does not pass jobId. Idempotency is enforced one layer up in shop-vi-import.service.ts: the service looks up the existing ShopVIImportJob by directoryPath (or by upstream webhook job_id) and only re-enqueues when status is PENDING/FAILED. Once IN_PROGRESS or COMPLETED, the producer short-circuits and returns the existing record.
  • kitsunesfwimagefromnsfwgeneration does not pass jobId. Re-runs are tolerated; each attempt regenerates assets for a characterId. With attempts: 5 and exponential backoff, the same payload may run multiple times. Storage keys are not deterministic: kitsune-sfw-image-from-nsfw-generation.service.ts:134 calls buildOnboardingKey(characterId, uuidv4(), 'jpg'), so each retry writes a fresh object rather than overwriting. Garbage-collection of orphaned keys is implicit (DB row points only at the latest outputStorageKey). TODO(@pawel): consider keying by (characterId, nsfwSourceType) to overwrite or to enable explicit cleanup.
  • exampleexplicitness does not pass jobId. The job operates against a tag’s current state and is safe to re-run, but stacking multiple jobs for the same tagId is not prevented. TODO(@pawel) — consider jobId: tagId.

5. Failure modes

ModeDetectionMitigation
Redis unreachableConnection errors on BullModule.forRootAsync boot or queue.add failures during request handling. Brain replicas crash-loop on Railway if Redis is down at startup.Verify REDIS_HOST / REDIS_PORT / REDIS_PASSWORD and Railway Redis service status (see brain runbook). Producers should not retry inline — surface 5xx and let the upstream caller retry.
Processor crash mid-jobBullMQ marks the job stalled; onStalled logs in media-flows.processor.ts and shop-vi-import.processor.ts. After the stall window the job is re-delivered to another worker (or the same one after restart).For mediaflows (no attempts override) a stalled job becomes a failed job once retries are exhausted; for kitsune… 5 attempts cushion crashes.
No retry on transient provider errors (mediaflows)Every transient FAL / RunPod / Kling 5xx becomes a permanent failure because attempts is unset. Failed-tab in Bull-Board fills with retryable provider errors.TODO(@pawel) — decide whether to set attempts >= 3 with exponential backoff at queue or per-add level. Today, mass retry is manual via Bull-Board or one-off scripts (brain runbook → Queue stuck).
Dead-letter overflowFailed jobs retained 7 days (removeOnFail.age) capped at 5,000 jobs. A storm of ContentModerationException (expected user-rejected content) or RoundServiceUnavailableException can blow past the count cap and start dropping older failures before they’re investigated.Daily QueueCleanupService cron (0 0 * * *) trims aggressively. Investigate spikes via Bull-Board cluster-by-error before they age out.
Unbounded retrieskitsune… is the only queue with retries; capped at 5. Other queues do not retry.Not currently a risk; revisit if attempts is added to other queues.
Hot key on mediaflowsconcurrency: 500 per replica means a single Redis instance fans out to up to 500 in-flight LREM/BRPOPLPUSH commands per replica × N replicas. Latency spike on Redis surfaces as queue stalls.Drop concurrency or replica count before scaling Redis. The brain runbook → Database connection saturation entry already calls this out for Postgres pressure (concurrency 500 also stresses pgbouncer).
Producer/processor payload driftProducer adds field; processor never reads it. No runtime failure, just silently lost data. Or: processor expects a field not yet added by the producer → undefined reaches downstream code and throws deep in the stack.Payload types are shared TS interfaces (KitsuneSfwImageFromNsfwGenerationJobPayload, ShopVIImportJobPayload, IGenerationJob). Treat them as contract — change one side at a time, drain the queue between deploys, and add lint guard against unconstrained as IGenerationJob casts in producers (media-scheduling.service.ts uses one today).
Missing correlation ID / trace contextContextualProcessor.process warns Job ${id} has no correlation ID - processing without context. Trace continuity into provider spans is lost.All producers must enqueue via ContextualQueue (which injects correlationId + W3C traceparent/tracestate). New producers should not call raw Queue.add.

6. Observability

Bull-Board is wired at GET /queues, mounted in apps/brain/src/app.module.ts:118 via BullBoardModule.forRoot({ route: '/queues', ... }). Every queue in QUEUES is registered through apps/brain/src/modules/bullBoardQueues.module.ts, so new queue names show up automatically once added to the enum. Access is Clerk-protected (see brain runbook → Queue stuck). Use it to inspect Waiting / Active / Completed / Failed / Delayed / Stalled tabs and to retry individual jobs.

Tracing. ContextualProcessor propagates W3C trace context from job payload (traceparent, tracestate) and starts a BULLMQ_PROCESS span tagged with job.name, job.id, queue.name (apps/brain/src/common/context/processors/contextual-processor.ts). Spans land in Axiom alongside the request that produced the job — see Observability spine → Trace Propagation.

Logs. Each processor logs active/completed/failed/stalled events through ContextualLogger (pino), so logs carry the job’s correlationId. Search Axiom with correlationId:<id> or trace_id:<id> to follow a job end-to-end.

Custom metrics. QueueStatsService (apps/brain/src/modules/domain/generation/services/queue-stats.service.ts) computes estimated wait times for mediaflows based on getWaiting() / getActive(). Brain exports mediaflows BullMQ job-state gauges to Axiom OTLP as brain.queue.jobs.active, brain.queue.jobs.waiting, brain.queue.jobs.delayed, and brain.queue.jobs.failed, each tagged with queue.name=mediaflows. active means Brain claimed the job from BullMQ; it does not prove downstream provider/GPU execution.

CLI. No first-party CLI; use redis-cli against Redis or Bull-Board for ad-hoc inspection.

7. Secrets

VarPurposeWhere
REDIS_HOSTRedis hostRailway service env (prod), docker-compose.yml (redis) for dev
REDIS_PORTRedis port (default 6379)Same
REDIS_PASSWORDOptional authRailway secret in prod; unset in dev

Read in apps/brain/src/config/queue.config.ts. The root .env.example declares BRAIN_REDIS_HOST and BRAIN_REDIS_PORT (lines 210-211); the per-service apps/brain/.env.example does not restate them. REDIS_PASSWORD is unset in dev. TODO(@pawel): add the password var explicitly to the brain .env.example for prod parity. There is no BULL_* namespace; all knobs are in code.

8. Local dev

Docker Compose ships a redis service used by brain in dev (docker-compose.yml, brain depends on redis and exports REDIS_HOST=redis, REDIS_PORT=6379). Bring up the full stack:

Terminal window
make dev-up-d # brain, redis, postgres, rest of the monorepo

Or just brain + Redis:

Terminal window
docker compose up -d redis
cd apps/brain && pnpm start:dev

Bull-Board is then available at http://localhost:3000/queues. Brain listens on process.env.PORT || 3000 (apps/brain/src/main.ts:88). Clerk auth gates /queues; dev-mode bypass behaviour is service-config-dependent.

To seed jobs locally, hit the producing HTTP/gRPC endpoint (e.g. trigger a generation via brisket → sirloin → brain) — there is no seed-jobs script today.

9. Cost model

Redis sizing is set in docker-compose.yml to 2 GB RAM for the dev container. Production sizing is governed by the Railway Redis plan; TODO(@pawel) — record current Railway tier and target headroom.

Drivers of Redis memory:

  • Active + waiting jobs — payload size × queue depth. IGenerationJob payloads are the largest (full IQueueGeneration plus prompts) and mediaflows peak depth scales with concurrent users.
  • Completed retention — 24h × completed-job rate × payload bytes (capped at 1,000 jobs).
  • Failed retention — 7d × failure rate × payload bytes (capped at 5,000 jobs).
  • Bull-Board is metadata-only, not a cost driver.

Daily QueueCleanupService cron prevents long-tail growth even when caps are loose.

10. Runbook hooks

Open items (TODO(@pawel))

  1. Intentional lack of attempts on mediaflows, exampleexplicitness, shopviimport?
  2. Kitsune storage path determinism per (characterId, nsfwSourceType).
  3. Add REDIS_HOST/REDIS_PORT/REDIS_PASSWORD to apps/brain/.env.example.
  4. Wire BullMQ queue-depth + failed-rate metrics to Axiom for SLO alerting.
  5. Production Railway Redis tier and target memory headroom.
  6. Local Bull-Board auth posture in dev (Clerk or bypassed).
  7. Consider jobId: tagId for exampleexplicitness to prevent duplicate stacking.