BullMQ Integration Playbook
1. Overview
BullMQ is the async job runtime inside brain. Producers (HTTP/gRPC controllers and services) push work onto Redis-backed queues; processors run inside the same NestJS process and consume jobs concurrently. There are no standalone worker pods today — every brain replica is also a worker. The wiring uses @nestjs/bullmq and bullmq (see apps/brain/package.json).
Redis is the only state for queues. Brain’s owned data lives in Postgres (fennec schema) — the queue is operational glue, not a system of record. When jobs need durability beyond Redis (e.g. shop-VI imports), the long-lived state is mirrored in Prisma rows and the BullMQ job becomes a transient “go do this” trigger.
2. Architecture
flowchart LR subgraph brisket UI[brisket / fennec UI] end subgraph sirloin SIR[sirloin gRPC] end subgraph brain[brain (NestJS)] PROD[Producer service<br/>InjectQueue] PROC[Processor<br/>@Processor + ContextualProcessor] SCHED[QueueCleanupService<br/>@Cron midnight] end REDIS[(Redis<br/>REDIS_HOST:REDIS_PORT)] PG[(Postgres<br/>fennec)] ROUND[round gRPC] FAL[FAL / RunPod / WaveSpeed / Kling] BB[Bull-Board UI<br/>GET /queues]
UI -->|REST/tRPC| SIR SIR -->|gRPC| PROD PROD -->|queue.add jobName, payload, opts| REDIS REDIS -->|fetched by worker| PROC PROC -->|update state| PG PROC -->|inference| ROUND PROC -->|provider calls| FAL SCHED -->|clean completed/failed| REDIS REDIS --- BBProducers and processors live in the same brain process. Wiring happens in module imports via BullModule.forRootAsync (root config, apps/brain/src/modules/domain/generation/generation.module.ts:67) plus per-queue BullModule.registerQueue / registerQueueAsync calls. Concurrency is set on the @Processor decorator.
The queue name table:
| Queue (string id) | Constant |
|---|---|
mediaflows | QUEUES.MEDIA_FLOWS |
exampleexplicitness | QUEUES.EXAMPLE_EXPLICITNESS |
shopviimport | QUEUES.SHOP_VI_IMPORT |
kitsunesfwimagefromnsfwgeneration | QUEUES.KITSUNE_SFW_IMAGE_FROM_NSFW_GENERATION |
Source: apps/brain/src/common/constants.ts.
3. Queues + jobs
| Queue | Producer code path | Processor code path | Job name(s) | Payload | Concurrency | Retry policy |
|---|---|---|---|---|---|---|
mediaflows | apps/brain/src/modules/domain/generation/services/media-scheduling.service.ts (processMediaGeneration); also enqueued via media.service.ts and queue-stats.service.ts lookups | apps/brain/src/modules/domain/generation/processors/media-flows.processor.ts (MediaFlowsProcessor) | media-flow-${mediaId} | IGenerationJob = { generationRequest: IQueueGeneration, jobId: string } (extends QueueJobData with correlationId, currentUser, traceparent, tracestate). Defined in apps/brain/src/modules/domain/generation/interfaces/generation.interface.ts:114 | 500 (very high — see Failure modes) | Global defaults: removeOnComplete after 24h / 1000 jobs, removeOnFail after 7d / 5000 jobs. No queue-level attempts override → defaults to 1 (no retry on transient errors). TODO(@pawel) — confirm intentional. |
exampleexplicitness | apps/brain/src/modules/domain/generation/services/tag.service.ts (recalculate-for-tag) | apps/brain/src/modules/domain/generation/processors/example-explicitness.processor.ts (ExampleExplicitnessProcessor) | recalculate-for-tag | ExampleExplicitnessJobData = { tagId, oldLevel, newLevel } extends QueueJobData | 1 (sequential — batch size 500 rows per job) | Global defaults only; no override |
shopviimport | apps/brain/src/modules/domain/shop-vi/services/shop-vi-import.service.ts | apps/brain/src/modules/domain/shop-vi/processors/shop-vi-import.processor.ts (ShopVIImportProcessor) | import, webhook-import | ShopVIImportJobPayload = { jobId: number, directoryPath: string, characterPaths?: string[] } | 1 | Global defaults only |
kitsunesfwimagefromnsfwgeneration | apps/brain/src/modules/domain/character/services/character.service.ts (generate) | apps/brain/src/modules/domain/character/processors/kitsune-sfw-image-from-nsfw-generation.processor.ts | generate | KitsuneSfwImageFromNsfwGenerationJobPayload = { characterId: string, nsfwSourceTypes?: KitsuneOnboardingImageType[] } | 20 | Per-queue override: attempts: 5, backoff: { type: 'exponential', delay: 10_000 } (apps/brain/src/modules/domain/character/kitsune-sfw-image-from-nsfw-generation-queue.constants.ts) |
Global default options (applied to every queue unless overridden) are in apps/brain/src/config/queue.config.ts.
4. Idempotency strategy
Mixed.
mediaflowsuses an explicitjobIdderived from the persistedgenIdat enqueue time —await this.contextualMediaFlowQueue.add(...,..., { jobId })(media-scheduling.service.ts). BullMQ deduplicates jobs sharing ajobIdwhile the prior copy is still present, so a duplicate enqueue with the same generation row is a no-op. The PostgresGenerationrow is created before the job is added, so the row is the durable identity.shopviimportdoes not passjobId. Idempotency is enforced one layer up inshop-vi-import.service.ts: the service looks up the existingShopVIImportJobbydirectoryPath(or by upstream webhookjob_id) and only re-enqueues when status isPENDING/FAILED. OnceIN_PROGRESSorCOMPLETED, the producer short-circuits and returns the existing record.kitsunesfwimagefromnsfwgenerationdoes not passjobId. Re-runs are tolerated; each attempt regenerates assets for acharacterId. Withattempts: 5and exponential backoff, the same payload may run multiple times. Storage keys are not deterministic:kitsune-sfw-image-from-nsfw-generation.service.ts:134callsbuildOnboardingKey(characterId, uuidv4(), 'jpg'), so each retry writes a fresh object rather than overwriting. Garbage-collection of orphaned keys is implicit (DB row points only at the latestoutputStorageKey). TODO(@pawel): consider keying by(characterId, nsfwSourceType)to overwrite or to enable explicit cleanup.exampleexplicitnessdoes not passjobId. The job operates against a tag’s current state and is safe to re-run, but stacking multiple jobs for the sametagIdis not prevented. TODO(@pawel) — considerjobId: tagId.
5. Failure modes
| Mode | Detection | Mitigation |
|---|---|---|
| Redis unreachable | Connection errors on BullModule.forRootAsync boot or queue.add failures during request handling. Brain replicas crash-loop on Railway if Redis is down at startup. | Verify REDIS_HOST / REDIS_PORT / REDIS_PASSWORD and Railway Redis service status (see brain runbook). Producers should not retry inline — surface 5xx and let the upstream caller retry. |
| Processor crash mid-job | BullMQ marks the job stalled; onStalled logs in media-flows.processor.ts and shop-vi-import.processor.ts. After the stall window the job is re-delivered to another worker (or the same one after restart). | For mediaflows (no attempts override) a stalled job becomes a failed job once retries are exhausted; for kitsune… 5 attempts cushion crashes. |
No retry on transient provider errors (mediaflows) | Every transient FAL / RunPod / Kling 5xx becomes a permanent failure because attempts is unset. Failed-tab in Bull-Board fills with retryable provider errors. | TODO(@pawel) — decide whether to set attempts >= 3 with exponential backoff at queue or per-add level. Today, mass retry is manual via Bull-Board or one-off scripts (brain runbook → Queue stuck). |
| Dead-letter overflow | Failed jobs retained 7 days (removeOnFail.age) capped at 5,000 jobs. A storm of ContentModerationException (expected user-rejected content) or RoundServiceUnavailableException can blow past the count cap and start dropping older failures before they’re investigated. | Daily QueueCleanupService cron (0 0 * * *) trims aggressively. Investigate spikes via Bull-Board cluster-by-error before they age out. |
| Unbounded retries | kitsune… is the only queue with retries; capped at 5. Other queues do not retry. | Not currently a risk; revisit if attempts is added to other queues. |
Hot key on mediaflows | concurrency: 500 per replica means a single Redis instance fans out to up to 500 in-flight LREM/BRPOPLPUSH commands per replica × N replicas. Latency spike on Redis surfaces as queue stalls. | Drop concurrency or replica count before scaling Redis. The brain runbook → Database connection saturation entry already calls this out for Postgres pressure (concurrency 500 also stresses pgbouncer). |
| Producer/processor payload drift | Producer adds field; processor never reads it. No runtime failure, just silently lost data. Or: processor expects a field not yet added by the producer → undefined reaches downstream code and throws deep in the stack. | Payload types are shared TS interfaces (KitsuneSfwImageFromNsfwGenerationJobPayload, ShopVIImportJobPayload, IGenerationJob). Treat them as contract — change one side at a time, drain the queue between deploys, and add lint guard against unconstrained as IGenerationJob casts in producers (media-scheduling.service.ts uses one today). |
| Missing correlation ID / trace context | ContextualProcessor.process warns Job ${id} has no correlation ID - processing without context. Trace continuity into provider spans is lost. | All producers must enqueue via ContextualQueue (which injects correlationId + W3C traceparent/tracestate). New producers should not call raw Queue.add. |
6. Observability
Bull-Board is wired at GET /queues, mounted in apps/brain/src/app.module.ts:118 via BullBoardModule.forRoot({ route: '/queues', ... }). Every queue in QUEUES is registered through apps/brain/src/modules/bullBoardQueues.module.ts, so new queue names show up automatically once added to the enum. Access is Clerk-protected (see brain runbook → Queue stuck). Use it to inspect Waiting / Active / Completed / Failed / Delayed / Stalled tabs and to retry individual jobs.
Tracing. ContextualProcessor propagates W3C trace context from job payload (traceparent, tracestate) and starts a BULLMQ_PROCESS span tagged with job.name, job.id, queue.name (apps/brain/src/common/context/processors/contextual-processor.ts). Spans land in Axiom alongside the request that produced the job — see Observability spine → Trace Propagation.
Logs. Each processor logs active/completed/failed/stalled events through ContextualLogger (pino), so logs carry the job’s correlationId. Search Axiom with correlationId:<id> or trace_id:<id> to follow a job end-to-end.
Custom metrics. QueueStatsService (apps/brain/src/modules/domain/generation/services/queue-stats.service.ts) computes estimated wait times for mediaflows based on getWaiting() / getActive(). Brain exports mediaflows BullMQ job-state gauges to Axiom OTLP as brain.queue.jobs.active, brain.queue.jobs.waiting, brain.queue.jobs.delayed, and brain.queue.jobs.failed, each tagged with queue.name=mediaflows. active means Brain claimed the job from BullMQ; it does not prove downstream provider/GPU execution.
CLI. No first-party CLI; use redis-cli against Redis or Bull-Board for ad-hoc inspection.
7. Secrets
| Var | Purpose | Where |
|---|---|---|
REDIS_HOST | Redis host | Railway service env (prod), docker-compose.yml (redis) for dev |
REDIS_PORT | Redis port (default 6379) | Same |
REDIS_PASSWORD | Optional auth | Railway secret in prod; unset in dev |
Read in apps/brain/src/config/queue.config.ts. The root .env.example
declares BRAIN_REDIS_HOST and BRAIN_REDIS_PORT (lines 210-211); the
per-service apps/brain/.env.example does not restate them. REDIS_PASSWORD
is unset in dev. TODO(@pawel): add the password var explicitly to the brain
.env.example for prod parity. There is no BULL_* namespace; all knobs are
in code.
8. Local dev
Docker Compose ships a redis service used by brain in dev (docker-compose.yml, brain depends on redis and exports REDIS_HOST=redis, REDIS_PORT=6379). Bring up the full stack:
make dev-up-d # brain, redis, postgres, rest of the monorepoOr just brain + Redis:
docker compose up -d rediscd apps/brain && pnpm start:devBull-Board is then available at http://localhost:3000/queues. Brain
listens on process.env.PORT || 3000 (apps/brain/src/main.ts:88). Clerk
auth gates /queues; dev-mode bypass behaviour is service-config-dependent.
To seed jobs locally, hit the producing HTTP/gRPC endpoint (e.g. trigger a generation via brisket → sirloin → brain) — there is no seed-jobs script today.
9. Cost model
Redis sizing is set in docker-compose.yml to 2 GB RAM for the dev container. Production sizing is governed by the Railway Redis plan; TODO(@pawel) — record current Railway tier and target headroom.
Drivers of Redis memory:
- Active + waiting jobs — payload size × queue depth.
IGenerationJobpayloads are the largest (fullIQueueGenerationplus prompts) andmediaflowspeak depth scales with concurrent users. - Completed retention — 24h × completed-job rate × payload bytes (capped at 1,000 jobs).
- Failed retention — 7d × failure rate × payload bytes (capped at 5,000 jobs).
- Bull-Board is metadata-only, not a cost driver.
Daily QueueCleanupService cron prevents long-tail growth even when caps are loose.
10. Runbook hooks
- Stalled / failed jobs and Redis-down recovery: Brain runbook → Queue stuck.
- Round outage spillover into queue failures: Brain runbook → Round outage.
- Postgres pressure from queue concurrency: Brain runbook → Database connection saturation.
- Trace pivot from Bull-Board → Axiom: Observability spine → Debugging Playbook.
Open items (TODO(@pawel))
- Intentional lack of
attemptsonmediaflows,exampleexplicitness,shopviimport? - Kitsune storage path determinism per
(characterId, nsfwSourceType). - Add
REDIS_HOST/REDIS_PORT/REDIS_PASSWORDtoapps/brain/.env.example. - Wire BullMQ queue-depth + failed-rate metrics to Axiom for SLO alerting.
- Production Railway Redis tier and target memory headroom.
- Local Bull-Board auth posture in dev (Clerk or bypassed).
- Consider
jobId: tagIdforexampleexplicitnessto prevent duplicate stacking.