.agents/features/workers.md
Workers are separate Node processes that poll the app for jobs and execute flows/triggers. The worker is the sandbox: each worker forks the engine in-process (@activepieces/sandbox, createSandboxRuntime) — there is no separate sandbox pool or Cloud Run runtime indirection. The destination model is one box per worker (concurrency 1), scaling out horizontally with replicas (each capped small, e.g. 0.5 vCPU / 1 GB) so an OOM kills one job, not a shared pool; a transitional compatibility mode still honours AP_WORKER_CONCURRENCY (N independent boxes). They connect to the app over a Socket.IO channel: on connect a worker fetches its runtime settings (WorkerSettingsResponse) and the app registers an RPC server (WorkerToApiContract) for that socket. Jobs are pulled by the worker via poll() rather than pushed. A worker advertises liveness and config through MachineInformation (heartbeat), whose workerProps carry its identity including version. In the default Docker image both activepieces-app and activepieces-worker run under PM2 from WORKDIR /usr/src/app; AP_CONTAINER_TYPE (APP / WORKER / WORKER_AND_APP) selects which start.
packages/server/api/src/app/workers/machine/machine-controller.ts — Socket.IO listeners (FETCH_WORKER_SETTINGS, DISCONNECT); registers the RPC server per connection. The DISCONNECT handler returns that connection's in-flight jobs to the queue (jobBroker.releaseConnectionJobs) before machineService.onDisconnectpackages/server/api/src/app/workers/job-queue/job-assignment-tracker.ts — in-memory (socket.id → in-flight jobs) map (keyed by queue+jobId); recorded on poll, cleared on completeJob, drained on disconnect to bound BullMQ active to live worker concurrencypackages/server/api/src/app/workers/machine/machine-service.ts — onConnection / onDisconnect, buildSettingsResponse (emits APP_VERSION), worker listingpackages/server/api/src/app/workers/rpc/worker-rpc-service.ts — createHandlers(): poll (with version gate), completeJob, extendLock, progress/log RPCs, getFlowBundle, prepareFlowBundleUpload, uploadFlowBundlepackages/server/worker/src/lib/worker.ts — worker lifecycle (worker.start/stop), pollAndExecute loop (with version gate, tracks inFlightJobs), getWorkerProps; builds the Runtime via createSandboxRuntime and a per-job Resolver via createResolver. stop() drains in-flight jobs (drainInFlightJobs) before teardown; an unexpected disconnect aborts the runtime (abortInFlightRuntime)packages/server/worker/src/lib/runtime/sandbox-config.ts — bridges worker settings → SandboxSettings + cache base path (merges the env-only AP_REUSE_SANDBOX override into the fetched WorkerSettings)packages/server/sandbox/ — standalone @activepieces/sandbox library holding the in-process sandbox (createSandboxRuntime, sandbox-manager, isolate/fork process makers), cache/piece installation, the worker-side createResolver, and the Runtime / Resolver / ProvisionInput type contracts (src/lib/types.ts)packages/server/worker/src/lib/config/configs.ts — worker env vars incl. AP_REUSE_SANDBOX (WorkerSystemProp.REUSE_SANDBOX, reuse the engine process between jobs) and AP_WORKER_CONCURRENCYpackages/server/worker/src/lib/config/worker-settings.ts — caches the WorkerSettingsResponse fetched on connectpackages/server/utils/src/ap-version.ts — apVersionUtil.getCurrentRelease(); both sides read the deploy-root package.json versionpackages/core/shared/src/lib/automation/workers/index.ts — WorkerProps, MachineInformation, WorkerSettingsResponse, WorkerToApiContract contractsWORKER_AND_APP for self-host single-container vs dedicated worker fleets on Cloud).Canonical term definitions live in the bounded-context glossaries — see CONTEXT-MAP.md.
Resolver / Runtime — the two roles of the in-process @activepieces/sandbox library that the worker drives per job. The Resolver (worker-side, owns the only apiClient) turns a job into a fully-materialized ProvisionInput: it resolves the flowVersion, piece metadata, and a ready (compiled) flow bundle, disabling the flow on a missing piece. It returns a discriminated ResolveResult (ready — with the resolved flowVersion when a flow was passed; flow-not-found; disabled) rather than throwing for expected missing/disabled-flow cases. The Runtime (the in-process single sandbox box) exposes execute / getActiveExecutors / shutdown and never reaches the app; execute owns the box lifecycle internally — acquire → provision (materialize pieces/code into the bind-mounted cache) → run one engine operation → release on success / invalidate on throw — re-raising the sandbox ActivepiecesError codes (timeout / memory / log-size) that handlers already catch. The Resolver's bundle path can return a bundle hit from the flow bundle store (getFlowBundle RPC): when a locked flow version's bundle is already cached, it supplies the frozen flow definition + piece manifest + compiled code directly, bypassing the per-piece getPiece RPC round-trips. Locked flows that change pieces must be re-locked to produce a new bundle; a freshly resolved locked version is published back (best-effort, after execution cache setup). Bundle transport: when the FLOW_BUNDLE file is S3-backed and S3_USE_SIGNED_URLS is on, the bundle bytes never cross the Socket.IO RPC — getFlowBundle returns a signed GET URL and prepareFlowBundleUpload returns a signed PUT URL, and the worker reads/writes S3 directly via the built-in fetch (sandbox/src/lib/utils/bundle-http.ts, dependency-free). S3-backed bundles without signed URLs stream inline Buffer over the socket (uploadFlowBundle). On DB-backed storage bundles are not persisted at all — prepareFlowBundleUpload returns { kind: 'skip' } and the worker always builds inline via the legacy resolve path (DB storage gains nothing from bundle caching and a null-data pre-save would throw). Any signed-link or fetch failure likewise degrades to the legacy getVersion + getPiece resolve path — a bundle never fails a run.WorkerProps — typed worker identity sent in every heartbeat (EXECUTION_MODE, WORKER_CONCURRENCY, SANDBOX_MEMORY_LIMIT, REUSE_SANDBOX, version). Previously a free-form Record<string,string>.WorkerSettingsResponse — runtime config the app hands a worker on connect; now includes APP_VERSION (the app's release).connectionGeneration — worker-side counter bumped on every disconnect; in-flight poll loops exit when their captured generation goes stale, so a reconnect starts fresh loops.FETCH_WORKER_SETTINGS; app's machineService.onConnection returns WorkerSettingsResponse (incl. APP_VERSION) and registers createHandlers for the socket.Runtime (fire-and-forget) and creates a fresh one, then spawns concurrency pollAndExecute loops. Recreating per (re)connect means an in-flight job from a prior connection is killed with its box (it fails fast and BullMQ retries it) rather than lingering on a reused box — reusing a busy box let the new generation collide on the single-operation engine child and let the lingering job's lock lapse → BullMQ Job stalled.apiClient.poll(machineInfo); the app's poll handler returns the next job for the worker's queue, or null. Handing out a job moves it to BullMQ active, so the handler records the job under the polling connection's socket.id (jobAssignmentTracker, keyed by queue+jobId) — only the app knows which connection holds it. (Scoped to socket.id, not the stable workerId, so a late disconnect of an old socket can't reclaim jobs a reconnected socket already polled.)extendLock, then completeJob; completeJob clears the assignment.connectionGeneration++ stops the loops; the app returns that connection's still-held jobs to the queue (jobBroker.releaseConnectionJobs(socket.id), called from the DISCONNECT handler — scoped to the socket so a late disconnect can't reclaim a reconnected socket's jobs) so they don't sit orphaned in active past worker concurrency (the deploy "Job stalled" storm) waiting on the minutes-long stalled-scan; the worker aborts its in-flight runtime (abortInFlightRuntime) so it stops executing a job the app is reclaiming. Socket.IO auto-reconnects (incl. a manual reconnect on io server disconnect, e.g. an app restart during a deploy) and the cycle repeats — recreating the runtime as in step 2.stop(), e.g. SIGTERM), the worker drains in-flight jobs first (drainInFlightJobs, ≤25s) so the executing job completes and reports before the runtime/socket are torn down — avoiding a kill+retry on a clean deploy. The stalled-scan remains the backstop for abrupt death (OOM/SIGKILL) where no drain is possible.Payload resolution is engine-side, not worker-side. Jobs carry a
JobPayload(inlinevalue orreffileId). The worker forwards it unchanged into the engine operation; the engine hydrates arefvia the file-download path (direct bytes or an S3 signed-link redirect). There is no worker→API payload-fetch RPC — the contract exposes nogetPayloadFile.
During a rolling upgrade the app and worker fleets briefly run different builds. Mixing them risks flow-schema/contract skew and silent run corruption, so dispatch is gated on an exact release match — both sides enforce it, whichever runs the newer build. Both sides decide via apVersionUtil.versionsAreCompatible({ versionA, versionB }), which is fail-closed: undefined (a pre-gate worker) or UNKNOWN_VERSION ('0.0.0', a failed read) on either side is treated as incompatible, otherwise compatible iff the two real versions are equal.
worker-rpc-service.ts#poll): if the worker's version and the app's getCurrentRelease() aren't compatible, it withholds the job (returns null). An old worker can never receive jobs from a new app.worker.ts#pollAndExecute): if the connected app's APP_VERSION and the worker's AP_VERSION aren't compatible, it pauses polling (VERSION_MISMATCH_POLL_PAUSE_MS, 10s) and retries. A new worker won't pull from an old app.UNKNOWN_VERSION read failure does not self-heal (the version is cached for the process lifetime) — it is logged at error level and each process pages on-call once about its own unreadable version (PAGE_ONCALL_WEBHOOK), so the deployment (cwd/packaging) can be fixed.apVersionUtil.getCurrentRelease() reads process.cwd()/package.json, the deploy-root release version (e.g. 0.85.4) for both processes under PM2 — not a workspace package.json. On read failure it falls back to UNKNOWN_VERSION ('0.0.0'). This is deliberately not symmetric-equal: two processes that both failed their read are treated as incompatible, because "both failed to read" is not "both are the same release" (a persistent packaging defect spanning releases would otherwise let a skewed pair pass).