Back to Trigger

Input Streams

docs/realtime/backend/input-streams.mdx

4.4.54.2 KB
Original Source

The Input Streams API allows you to send data into running Trigger.dev tasks from your backend code. This enables bidirectional communication — while output streams let you read data from tasks, input streams let you push data into them.

<Note> To learn how to receive input stream data inside your tasks, see [Input Streams](/tasks/streams#input-streams) in the Streams doc. </Note>

Sending data to a running task

The recommended approach is to use defined input streams for full type safety:

ts
import { cancelSignal, approval } from "./trigger/streams";

// Cancel a running AI stream
await cancelSignal.send(runId, { reason: "User clicked stop" });

// Approve a draft
await approval.send(runId, { approved: true, reviewer: "[email protected]" });

The .send() method is fully typed — the data parameter must match the generic type you defined on the input stream.

<Note> `.send()` works the same regardless of how the task is listening — whether it uses `.wait()` (suspending), `.once()` (non-suspending), or `.on()` (continuous). The sender doesn't need to know how the task is consuming the data. See [Input Streams](/tasks/streams#input-streams) for details on each receiving method. </Note>

Practical examples

Cancel from a Next.js API route

ts
import { cancelStream } from "@/trigger/streams";

export async function POST(req: Request) {
  const { runId } = await req.json();

  await cancelStream.send(runId, { reason: "User clicked stop" });

  return Response.json({ cancelled: true });
}

Approval workflow API

ts
import { approval } from "@/trigger/streams";

export async function POST(req: Request) {
  const { runId, approved, reviewer } = await req.json();

  await approval.send(runId, {
    approved,
    reviewer,
  });

  return Response.json({ success: true });
}

Remix action handler

ts
import { json, type ActionFunctionArgs } from "@remix-run/node";
import { approval } from "~/trigger/streams";

export async function action({ request }: ActionFunctionArgs) {
  const formData = await request.formData();
  const runId = formData.get("runId") as string;
  const approved = formData.get("approved") === "true";
  const reviewer = formData.get("reviewer") as string;

  await approval.send(runId, { approved, reviewer });

  return json({ success: true });
}

Express handler

ts
import express from "express";
import { cancelSignal } from "./trigger/streams";

const app = express();
app.use(express.json());

app.post("/api/cancel", async (req, res) => {
  const { runId, reason } = req.body;

  await cancelSignal.send(runId, { reason });

  res.json({ cancelled: true });
});

Sending from another task

You can send input stream data from one task to another running task:

ts
import { task } from "@trigger.dev/sdk";
import { approval } from "./streams";

export const reviewerTask = task({
  id: "auto-reviewer",
  run: async (payload: { targetRunId: string }) => {
    // Perform automated review logic...
    const isApproved = await performReview();

    // Send approval to the waiting task
    await approval.send(payload.targetRunId, {
      approved: isApproved,
      reviewer: "auto-reviewer",
    });
  },
});

Error handling

The .send() method will throw if:

  • The run has already completed, failed, or been canceled
  • The payload exceeds the 1MB size limit
  • The run ID is invalid
ts
import { cancelSignal } from "./trigger/streams";

try {
  await cancelSignal.send(runId, { reason: "User clicked stop" });
} catch (error) {
  console.error("Failed to send:", error);
  // Handle the error — the run may have already completed
}

Important notes

  • Maximum payload size per .send() call is 1MB
  • You cannot send data to a completed, failed, or canceled run
  • Data sent before a listener is registered inside the task is buffered and delivered when a listener attaches
  • Input streams require the current streams implementation (v2 is the default in SDK 4.1.0+). See Streams for details.