plans/2026-05-06-observation-queue-engine-deep-dive.md
Date: 2026-05-06
If claude-mem replaces its observation queue with one of the two Redis-backed libraries, choose BullMQ, not Bee-Queue.
That said, the current observation queue is not a generic background job queue. It is a durable, per-session input stream feeding long-lived provider generators. Replacing it with Redis should not be the default local install path unless claude-mem is willing to require, bundle, or supervise Redis. If Redis is not acceptable as a new operational dependency, the better path is to keep the SQLite queue and fix the contract/test drift.
Recommended path:
The active queue path is:
src/services/worker/http/shared.ts and SessionRoutes.ts ingest observations/summarize requests.SessionManager.queueObservation() and queueSummarize() persist rows through PendingMessageStore.enqueue().SessionQueueProcessor.createIterator() claims one row at a time and wakes via a per-session EventEmitter.ClaudeProvider, GeminiProvider, and OpenRouterProvider consume sessionManager.getMessageIterator(sessionDbId).processAgentResponse(), then SessionManager.clearPendingForSession() clears that session's pending rows.Key semantics that must survive any replacement:
processing back to pending.content_session_id + tool_use_id./api/processing-status and SSE.Important mismatch found during the dive:
PendingMessageStore only models pending and processing.processed, failed, retry_count, completed_at_epoch, failed_at_epoch, and worker_pid.storeObservationsAndMarkComplete() still updates a row to processed, while the currently visible queue path clears all pending messages for the session after parsing.src/services/sqlite/schema.sql still creates idx_pending_messages_worker_pid even though the visible table definition has no worker_pid.Focused test run:
bun test tests/services/sqlite/PendingMessageStore.test.ts tests/services/queue/SessionQueueProcessor.test.ts
Result: 10 pass, 6 fail. Failures show stale tests/contract drift:
PendingMessageStore.test.ts passes 3 as constructor arg, but constructor now expects onMutate?: () => void.SessionQueueProcessor.test.ts expects retry-after-store-error behavior, but current implementation logs and exits the iterator on claim failure.This needs to be reconciled before swapping engines; otherwise the migration will encode inconsistent behavior.
Sources checked:
Current package/repo facts captured on 2026-05-06:
[email protected].ioredis, cron-parser, msgpackr, node-abort-controller, semver, tslib.import { Queue } from 'bullmq'.Strengths for claude-mem:
Worker#getNextJob(), moveToCompleted(), moveToFailed(), and lock extension. This matters because claude-mem's provider loop is closer to a stream consumer than a normal job processor.Costs and risks:
maxmemory-policy=noeviction are recommended/required for correctness.Worker and QueueEvents need blocking/duplicated connections in some cases.jobId is useful for tool_use_id dedupe, but BullMQ custom job IDs must not contain :. Use a hash or safe delimiter.extendLock() for long provider calls or a large lock duration.Best BullMQ shape if adopted:
claude-mem:session:<safe-session-db-id> or a hashed content-session suffix.1.jobId for observation dedupe:
obs_<sha256(contentSessionId + "\0" + toolUseId)>.removeOnComplete aggressively if SQLite remains the source of truth for stored observations.CLAUDE_MEM_QUEUE_ENGINE=sqlite|bullmqCLAUDE_MEM_REDIS_URLCLAUDE_MEM_QUEUE_REDIS_PREFIXCLAUDE_MEM_QUEUE_ENCRYPT_PAYLOADS=true|false if sensitive fields are stored.Sources checked:
Current package/repo facts captured on 2026-05-06:
[email protected].NOASSERTION.redis@^3.1.2, p-finally, promise-callbacks../index.d.ts.import BeeQueue from 'bee-queue'.Strengths for claude-mem:
Costs and risks:
redis v3 client line, not modern redis v4/v5 or ioredis.activateDelayedJobs on at least one queue instance.Conclusion: Bee-Queue is attractive if the only goal is "small Redis queue for short jobs." claude-mem needs a durable session stream with strict per-session semantics, good TypeScript ergonomics, explicit recovery behavior, and long-term maintenance. Bee-Queue is the wrong tradeoff.
| Area | Current SQLite | BullMQ | Bee-Queue |
|---|---|---|---|
| Local-first install | Strong | Weak unless Redis is bundled/optional | Weak unless Redis is bundled/optional |
| Per-session FIFO | Strong | Medium with per-session queues; weak with one global queue | Medium with per-session queues; weak with one global queue |
| Restart durability | Strong, SQLite-backed | Strong if Redis persistence configured | Strong if Redis persistence configured |
| Stalled recovery | Custom/simple | Strong built-in | Built-in |
| TypeScript fit | Strong | Strong | Medium |
| Maintenance/activity | Internal | Strong | Medium |
| Operational complexity | Low | High | Medium-high |
| Queue observability | Custom/basic | Strong | Medium |
| Dependency footprint | Low | Larger | Small |
| Privacy/data locality | SQLite local file | Redis clear-text unless handled | Redis clear-text unless handled |
| Best use in claude-mem | Default | Optional advanced backend | Do not use |
Phase 0: Fix the existing contract
pending_messages.status is only pending|processing, or whether processed|failed is coming back.schema.sql and migrations so worker_pid indexes are not created after worker_pid is dropped.storeObservationsAndMarkComplete() or remove it if no longer used.tool_use_id;Phase 1: Add an adapter boundary
Define a small interface around current behavior, not around BullMQ:
interface ObservationQueueEngine {
enqueue(sessionDbId: number, contentSessionId: string, message: PendingMessage): Promise<EnqueueResult>;
createIterator(sessionDbId: number, signal: AbortSignal, onIdleTimeout?: () => void): AsyncIterableIterator<PendingMessageWithId>;
clearPendingForSession(sessionDbId: number): Promise<number>;
resetProcessingToPending(sessionDbId: number): Promise<number>;
getPendingCount(sessionDbId: number): Promise<number>;
getTotalQueueDepth(): Promise<number>;
close(): Promise<void>;
}
Keep SqliteObservationQueueEngine as the first implementation by moving the current PendingMessageStore + SessionQueueProcessor behavior behind this interface.
Phase 2: Add BullMQ backend behind feature flag
BullMqObservationQueueEngine.jobId for observation dedupe._originalTimestamp in job data./api/health only when BullMQ backend is enabled.Phase 3: Migration and fallback
pending_messages rows into BullMQ once, then mark/delete migrated rows.CLAUDE_MEM_QUEUE_ENGINE=bullmq; do not silently drop observations.sqlite, do not require Redis.Phase 4: Tests
tool_use_id is suppressed;Do not do a direct swap from SQLite to either library.
If the product goal is to keep claude-mem easy to install and local-first, invest in the current SQLite queue: clean up the schema/status drift, restore tests, add explicit retries/failure rows if needed, and keep the in-process wakeup path.
If the product goal is to support distributed workers or stronger queue observability, add BullMQ as an optional backend through an adapter. It has the right maintenance profile, TypeScript support, recovery primitives, and docs. Bee-Queue is too narrow and too legacy-client-oriented for this role.