rules/4.0.0/advanced-tasks.md
Advanced patterns and features for writing tasks
import { task, tags } from "@trigger.dev/sdk";
export const processUser = task({
id: "process-user",
run: async (payload: { userId: string; orgId: string }, { ctx }) => {
// Add tags during execution
await tags.add(`user_${payload.userId}`);
await tags.add(`org_${payload.orgId}`);
return { processed: true };
},
});
// Trigger with tags
await processUser.trigger(
{ userId: "123", orgId: "abc" },
{ tags: ["priority", "user_123", "org_abc"] } // Max 10 tags per run
);
// Subscribe to tagged runs
for await (const run of runs.subscribeToRunsWithTag("user_123")) {
console.log(`User task ${run.id}: ${run.status}`);
}
Tag Best Practices:
user_123, org_abc, video:456import { task, queue } from "@trigger.dev/sdk";
// Shared queue for related tasks
const emailQueue = queue({
name: "email-processing",
concurrencyLimit: 5, // Max 5 emails processing simultaneously
});
// Task-level concurrency
export const oneAtATime = task({
id: "sequential-task",
queue: { concurrencyLimit: 1 }, // Process one at a time
run: async (payload) => {
// Critical section - only one instance runs
},
});
// Per-user concurrency
export const processUserData = task({
id: "process-user-data",
run: async (payload: { userId: string }) => {
// Override queue with user-specific concurrency
await childTask.trigger(payload, {
queue: {
name: `user-${payload.userId}`,
concurrencyLimit: 2,
},
});
},
});
export const emailTask = task({
id: "send-email",
queue: emailQueue, // Use shared queue
run: async (payload: { to: string }) => {
// Send email logic
},
});
import { task, retry, AbortTaskRunError } from "@trigger.dev/sdk";
export const resilientTask = task({
id: "resilient-task",
retry: {
maxAttempts: 10,
factor: 1.8, // Exponential backoff multiplier
minTimeoutInMs: 500,
maxTimeoutInMs: 30_000,
randomize: false,
},
catchError: async ({ error, ctx }) => {
// Custom error handling
if (error.code === "FATAL_ERROR") {
throw new AbortTaskRunError("Cannot retry this error");
}
// Log error details
console.error(`Task ${ctx.task.id} failed:`, error);
// Allow retry by returning nothing
return { retryAt: new Date(Date.now() + 60000) }; // Retry in 1 minute
},
run: async (payload) => {
// Retry specific operations
const result = await retry.onThrow(
async () => {
return await unstableApiCall(payload);
},
{ maxAttempts: 3 }
);
// Conditional HTTP retries
const response = await retry.fetch("https://api.example.com", {
retry: {
maxAttempts: 5,
condition: (response, error) => {
return response?.status === 429 || response?.status >= 500;
},
},
});
return result;
},
});
export const heavyTask = task({
id: "heavy-computation",
machine: { preset: "large-2x" }, // 8 vCPU, 16 GB RAM
maxDuration: 1800, // 30 minutes timeout
run: async (payload, { ctx }) => {
// Resource-intensive computation
if (ctx.machine.preset === "large-2x") {
// Use all available cores
return await parallelProcessing(payload);
}
return await standardProcessing(payload);
},
});
// Override machine when triggering
await heavyTask.trigger(payload, {
machine: { preset: "medium-1x" }, // Override for this run
});
Machine Presets:
micro: 0.25 vCPU, 0.25 GB RAMsmall-1x: 0.5 vCPU, 0.5 GB RAM (default)small-2x: 1 vCPU, 1 GB RAMmedium-1x: 1 vCPU, 2 GB RAMmedium-2x: 2 vCPU, 4 GB RAMlarge-1x: 4 vCPU, 8 GB RAMlarge-2x: 8 vCPU, 16 GB RAMimport { task, idempotencyKeys } from "@trigger.dev/sdk";
export const paymentTask = task({
id: "process-payment",
retry: {
maxAttempts: 3,
},
run: async (payload: { orderId: string; amount: number }) => {
// Automatically scoped to this task run, so if the task is retried, the idempotency key will be the same
const idempotencyKey = await idempotencyKeys.create(`payment-${payload.orderId}`);
// Ensure payment is processed only once
await chargeCustomer.trigger(payload, {
idempotencyKey,
idempotencyKeyTTL: "24h", // Key expires in 24 hours
});
},
});
// Payload-based idempotency
import { createHash } from "node:crypto";
function createPayloadHash(payload: any): string {
const hash = createHash("sha256");
hash.update(JSON.stringify(payload));
return hash.digest("hex");
}
export const deduplicatedTask = task({
id: "deduplicated-task",
run: async (payload) => {
const payloadHash = createPayloadHash(payload);
const idempotencyKey = await idempotencyKeys.create(payloadHash);
await processData.trigger(payload, { idempotencyKey });
},
});
import { task, metadata } from "@trigger.dev/sdk";
export const batchProcessor = task({
id: "batch-processor",
run: async (payload: { items: any[] }, { ctx }) => {
const totalItems = payload.items.length;
// Initialize progress metadata
metadata
.set("progress", 0)
.set("totalItems", totalItems)
.set("processedItems", 0)
.set("status", "starting");
const results = [];
for (let i = 0; i < payload.items.length; i++) {
const item = payload.items[i];
// Process item
const result = await processItem(item);
results.push(result);
// Update progress
const progress = ((i + 1) / totalItems) * 100;
metadata
.set("progress", progress)
.increment("processedItems", 1)
.append("logs", `Processed item ${i + 1}/${totalItems}`)
.set("currentItem", item.id);
}
// Final status
metadata.set("status", "completed");
return { results, totalProcessed: results.length };
},
});
// Update parent metadata from child task
export const childTask = task({
id: "child-task",
run: async (payload, { ctx }) => {
// Update parent task metadata
metadata.parent.set("childStatus", "processing");
metadata.root.increment("childrenCompleted", 1);
return { processed: true };
},
});
"use client";
import { useTaskTrigger } from "@trigger.dev/react-hooks";
import type { myTask } from "../trigger/tasks";
function TriggerButton({ accessToken }: { accessToken: string }) {
const { submit, handle, isLoading } = useTaskTrigger<typeof myTask>("my-task", { accessToken });
return (
<button onClick={() => submit({ data: "from frontend" })} disabled={isLoading}>
Trigger Task
</button>
);
}
// For payloads > 512KB (max 10MB)
export const largeDataTask = task({
id: "large-data-task",
run: async (payload: { dataUrl: string }) => {
// Trigger.dev automatically handles large payloads
// For > 10MB, use external storage
const response = await fetch(payload.dataUrl);
const largeData = await response.json();
return { processed: largeData.length };
},
});
// Best practice: Use presigned URLs for very large files
await largeDataTask.trigger({
dataUrl: "https://s3.amazonaws.com/bucket/large-file.json?presigned=true",
});
await myTask.trigger(payload, {
delay: "2h30m", // Delay execution
ttl: "24h", // Expire if not started within 24 hours
priority: 100, // Higher priority (time offset in seconds)
tags: ["urgent", "user_123"],
metadata: { source: "api", version: "v2" },
queue: {
name: "priority-queue",
concurrencyLimit: 10,
},
idempotencyKey: "unique-operation-id",
idempotencyKeyTTL: "1h",
machine: { preset: "large-1x" },
maxAttempts: 5,
});
// Hidden task - not exported, only used internally
const internalProcessor = task({
id: "internal-processor",
run: async (payload: { data: string }) => {
return { processed: payload.data.toUpperCase() };
},
});
// Public task that uses hidden task
export const publicWorkflow = task({
id: "public-workflow",
run: async (payload: { input: string }) => {
// Use hidden task internally
const result = await internalProcessor.triggerAndWait({
data: payload.input,
});
if (result.ok) {
return { output: result.output.processed };
}
throw new Error("Internal processing failed");
},
});
import { task, logger } from "@trigger.dev/sdk";
export const tracedTask = task({
id: "traced-task",
run: async (payload, { ctx }) => {
logger.info("Task started", { userId: payload.userId });
// Custom trace with attributes
const user = await logger.trace(
"fetch-user",
async (span) => {
span.setAttribute("user.id", payload.userId);
span.setAttribute("operation", "database-fetch");
const userData = await database.findUser(payload.userId);
span.setAttribute("user.found", !!userData);
return userData;
},
{ userId: payload.userId }
);
logger.debug("User fetched", { user: user.id });
try {
const result = await processUser(user);
logger.info("Processing completed", { result });
return result;
} catch (error) {
logger.error("Processing failed", {
error: error.message,
userId: payload.userId,
});
throw error;
}
},
});
import { task, usage } from "@trigger.dev/sdk";
export const monitoredTask = task({
id: "monitored-task",
run: async (payload) => {
// Get current run cost
const currentUsage = await usage.getCurrent();
logger.info("Current cost", {
costInCents: currentUsage.costInCents,
durationMs: currentUsage.durationMs,
});
// Measure specific operation
const { result, compute } = await usage.measure(async () => {
return await expensiveOperation(payload);
});
logger.info("Operation cost", {
costInCents: compute.costInCents,
durationMs: compute.durationMs,
});
return result;
},
});
// Cancel runs
await runs.cancel("run_123");
// Replay runs with same payload
await runs.replay("run_123");
// Retrieve run with cost details
const run = await runs.retrieve("run_123");
console.log(`Cost: ${run.costInCents} cents, Duration: ${run.durationMs}ms`);
Design tasks to be stateless, idempotent, and resilient to failures. Use metadata for state tracking and queues for resource management.