docs/realtime/backend/input-streams.mdx
The Input Streams API allows you to send data into running Trigger.dev tasks from your backend code. This enables bidirectional communication — while output streams let you read data from tasks, input streams let you push data into them.
<Note> To learn how to receive input stream data inside your tasks, see [Input Streams](/tasks/streams#input-streams) in the Streams doc. </Note>The recommended approach is to use defined input streams for full type safety:
import { cancelSignal, approval } from "./trigger/streams";
// Cancel a running AI stream
await cancelSignal.send(runId, { reason: "User clicked stop" });
// Approve a draft
await approval.send(runId, { approved: true, reviewer: "[email protected]" });
The .send() method is fully typed — the data parameter must match the generic type you defined on the input stream.
import { cancelStream } from "@/trigger/streams";
export async function POST(req: Request) {
const { runId } = await req.json();
await cancelStream.send(runId, { reason: "User clicked stop" });
return Response.json({ cancelled: true });
}
import { approval } from "@/trigger/streams";
export async function POST(req: Request) {
const { runId, approved, reviewer } = await req.json();
await approval.send(runId, {
approved,
reviewer,
});
return Response.json({ success: true });
}
import { json, type ActionFunctionArgs } from "@remix-run/node";
import { approval } from "~/trigger/streams";
export async function action({ request }: ActionFunctionArgs) {
const formData = await request.formData();
const runId = formData.get("runId") as string;
const approved = formData.get("approved") === "true";
const reviewer = formData.get("reviewer") as string;
await approval.send(runId, { approved, reviewer });
return json({ success: true });
}
import express from "express";
import { cancelSignal } from "./trigger/streams";
const app = express();
app.use(express.json());
app.post("/api/cancel", async (req, res) => {
const { runId, reason } = req.body;
await cancelSignal.send(runId, { reason });
res.json({ cancelled: true });
});
You can send input stream data from one task to another running task:
import { task } from "@trigger.dev/sdk";
import { approval } from "./streams";
export const reviewerTask = task({
id: "auto-reviewer",
run: async (payload: { targetRunId: string }) => {
// Perform automated review logic...
const isApproved = await performReview();
// Send approval to the waiting task
await approval.send(payload.targetRunId, {
approved: isApproved,
reviewer: "auto-reviewer",
});
},
});
The .send() method will throw if:
import { cancelSignal } from "./trigger/streams";
try {
await cancelSignal.send(runId, { reason: "User clicked stop" });
} catch (error) {
console.error("Failed to send:", error);
// Handle the error — the run may have already completed
}
.send() call is 1MB