Back to Trigger

Trigger.dev Advanced Tasks (v4)

rules/4.0.0/advanced-tasks.md

4.4.511.2 KB
Original Source

Trigger.dev Advanced Tasks (v4)

Advanced patterns and features for writing tasks

Tags & Organization

ts
import { task, tags } from "@trigger.dev/sdk";

export const processUser = task({
  id: "process-user",
  run: async (payload: { userId: string; orgId: string }, { ctx }) => {
    // Add tags during execution
    await tags.add(`user_${payload.userId}`);
    await tags.add(`org_${payload.orgId}`);

    return { processed: true };
  },
});

// Trigger with tags
await processUser.trigger(
  { userId: "123", orgId: "abc" },
  { tags: ["priority", "user_123", "org_abc"] } // Max 10 tags per run
);

// Subscribe to tagged runs
for await (const run of runs.subscribeToRunsWithTag("user_123")) {
  console.log(`User task ${run.id}: ${run.status}`);
}

Tag Best Practices:

  • Use prefixes: user_123, org_abc, video:456
  • Max 10 tags per run, 1-64 characters each
  • Tags don't propagate to child tasks automatically

Concurrency & Queues

ts
import { task, queue } from "@trigger.dev/sdk";

// Shared queue for related tasks
const emailQueue = queue({
  name: "email-processing",
  concurrencyLimit: 5, // Max 5 emails processing simultaneously
});

// Task-level concurrency
export const oneAtATime = task({
  id: "sequential-task",
  queue: { concurrencyLimit: 1 }, // Process one at a time
  run: async (payload) => {
    // Critical section - only one instance runs
  },
});

// Per-user concurrency
export const processUserData = task({
  id: "process-user-data",
  run: async (payload: { userId: string }) => {
    // Override queue with user-specific concurrency
    await childTask.trigger(payload, {
      queue: {
        name: `user-${payload.userId}`,
        concurrencyLimit: 2,
      },
    });
  },
});

export const emailTask = task({
  id: "send-email",
  queue: emailQueue, // Use shared queue
  run: async (payload: { to: string }) => {
    // Send email logic
  },
});

Error Handling & Retries

ts
import { task, retry, AbortTaskRunError } from "@trigger.dev/sdk";

export const resilientTask = task({
  id: "resilient-task",
  retry: {
    maxAttempts: 10,
    factor: 1.8, // Exponential backoff multiplier
    minTimeoutInMs: 500,
    maxTimeoutInMs: 30_000,
    randomize: false,
  },
  catchError: async ({ error, ctx }) => {
    // Custom error handling
    if (error.code === "FATAL_ERROR") {
      throw new AbortTaskRunError("Cannot retry this error");
    }

    // Log error details
    console.error(`Task ${ctx.task.id} failed:`, error);

    // Allow retry by returning nothing
    return { retryAt: new Date(Date.now() + 60000) }; // Retry in 1 minute
  },
  run: async (payload) => {
    // Retry specific operations
    const result = await retry.onThrow(
      async () => {
        return await unstableApiCall(payload);
      },
      { maxAttempts: 3 }
    );

    // Conditional HTTP retries
    const response = await retry.fetch("https://api.example.com", {
      retry: {
        maxAttempts: 5,
        condition: (response, error) => {
          return response?.status === 429 || response?.status >= 500;
        },
      },
    });

    return result;
  },
});

Machines & Performance

ts
export const heavyTask = task({
  id: "heavy-computation",
  machine: { preset: "large-2x" }, // 8 vCPU, 16 GB RAM
  maxDuration: 1800, // 30 minutes timeout
  run: async (payload, { ctx }) => {
    // Resource-intensive computation
    if (ctx.machine.preset === "large-2x") {
      // Use all available cores
      return await parallelProcessing(payload);
    }

    return await standardProcessing(payload);
  },
});

// Override machine when triggering
await heavyTask.trigger(payload, {
  machine: { preset: "medium-1x" }, // Override for this run
});

Machine Presets:

  • micro: 0.25 vCPU, 0.25 GB RAM
  • small-1x: 0.5 vCPU, 0.5 GB RAM (default)
  • small-2x: 1 vCPU, 1 GB RAM
  • medium-1x: 1 vCPU, 2 GB RAM
  • medium-2x: 2 vCPU, 4 GB RAM
  • large-1x: 4 vCPU, 8 GB RAM
  • large-2x: 8 vCPU, 16 GB RAM

Idempotency

ts
import { task, idempotencyKeys } from "@trigger.dev/sdk";

export const paymentTask = task({
  id: "process-payment",
  retry: {
    maxAttempts: 3,
  },
  run: async (payload: { orderId: string; amount: number }) => {
    // Automatically scoped to this task run, so if the task is retried, the idempotency key will be the same
    const idempotencyKey = await idempotencyKeys.create(`payment-${payload.orderId}`);

    // Ensure payment is processed only once
    await chargeCustomer.trigger(payload, {
      idempotencyKey,
      idempotencyKeyTTL: "24h", // Key expires in 24 hours
    });
  },
});

// Payload-based idempotency
import { createHash } from "node:crypto";

function createPayloadHash(payload: any): string {
  const hash = createHash("sha256");
  hash.update(JSON.stringify(payload));
  return hash.digest("hex");
}

export const deduplicatedTask = task({
  id: "deduplicated-task",
  run: async (payload) => {
    const payloadHash = createPayloadHash(payload);
    const idempotencyKey = await idempotencyKeys.create(payloadHash);

    await processData.trigger(payload, { idempotencyKey });
  },
});

Metadata & Progress Tracking

ts
import { task, metadata } from "@trigger.dev/sdk";

export const batchProcessor = task({
  id: "batch-processor",
  run: async (payload: { items: any[] }, { ctx }) => {
    const totalItems = payload.items.length;

    // Initialize progress metadata
    metadata
      .set("progress", 0)
      .set("totalItems", totalItems)
      .set("processedItems", 0)
      .set("status", "starting");

    const results = [];

    for (let i = 0; i < payload.items.length; i++) {
      const item = payload.items[i];

      // Process item
      const result = await processItem(item);
      results.push(result);

      // Update progress
      const progress = ((i + 1) / totalItems) * 100;
      metadata
        .set("progress", progress)
        .increment("processedItems", 1)
        .append("logs", `Processed item ${i + 1}/${totalItems}`)
        .set("currentItem", item.id);
    }

    // Final status
    metadata.set("status", "completed");

    return { results, totalProcessed: results.length };
  },
});

// Update parent metadata from child task
export const childTask = task({
  id: "child-task",
  run: async (payload, { ctx }) => {
    // Update parent task metadata
    metadata.parent.set("childStatus", "processing");
    metadata.root.increment("childrenCompleted", 1);

    return { processed: true };
  },
});

Advanced Triggering

Frontend Triggering (React)

tsx
"use client";
import { useTaskTrigger } from "@trigger.dev/react-hooks";
import type { myTask } from "../trigger/tasks";

function TriggerButton({ accessToken }: { accessToken: string }) {
  const { submit, handle, isLoading } = useTaskTrigger<typeof myTask>("my-task", { accessToken });

  return (
    <button onClick={() => submit({ data: "from frontend" })} disabled={isLoading}>
      Trigger Task
    </button>
  );
}

Large Payloads

ts
// For payloads > 512KB (max 10MB)
export const largeDataTask = task({
  id: "large-data-task",
  run: async (payload: { dataUrl: string }) => {
    // Trigger.dev automatically handles large payloads
    // For > 10MB, use external storage
    const response = await fetch(payload.dataUrl);
    const largeData = await response.json();

    return { processed: largeData.length };
  },
});

// Best practice: Use presigned URLs for very large files
await largeDataTask.trigger({
  dataUrl: "https://s3.amazonaws.com/bucket/large-file.json?presigned=true",
});

Advanced Options

ts
await myTask.trigger(payload, {
  delay: "2h30m", // Delay execution
  ttl: "24h", // Expire if not started within 24 hours
  priority: 100, // Higher priority (time offset in seconds)
  tags: ["urgent", "user_123"],
  metadata: { source: "api", version: "v2" },
  queue: {
    name: "priority-queue",
    concurrencyLimit: 10,
  },
  idempotencyKey: "unique-operation-id",
  idempotencyKeyTTL: "1h",
  machine: { preset: "large-1x" },
  maxAttempts: 5,
});

Hidden Tasks

ts
// Hidden task - not exported, only used internally
const internalProcessor = task({
  id: "internal-processor",
  run: async (payload: { data: string }) => {
    return { processed: payload.data.toUpperCase() };
  },
});

// Public task that uses hidden task
export const publicWorkflow = task({
  id: "public-workflow",
  run: async (payload: { input: string }) => {
    // Use hidden task internally
    const result = await internalProcessor.triggerAndWait({
      data: payload.input,
    });

    if (result.ok) {
      return { output: result.output.processed };
    }

    throw new Error("Internal processing failed");
  },
});

Logging & Tracing

ts
import { task, logger } from "@trigger.dev/sdk";

export const tracedTask = task({
  id: "traced-task",
  run: async (payload, { ctx }) => {
    logger.info("Task started", { userId: payload.userId });

    // Custom trace with attributes
    const user = await logger.trace(
      "fetch-user",
      async (span) => {
        span.setAttribute("user.id", payload.userId);
        span.setAttribute("operation", "database-fetch");

        const userData = await database.findUser(payload.userId);
        span.setAttribute("user.found", !!userData);

        return userData;
      },
      { userId: payload.userId }
    );

    logger.debug("User fetched", { user: user.id });

    try {
      const result = await processUser(user);
      logger.info("Processing completed", { result });
      return result;
    } catch (error) {
      logger.error("Processing failed", {
        error: error.message,
        userId: payload.userId,
      });
      throw error;
    }
  },
});

Usage Monitoring

ts
import { task, usage } from "@trigger.dev/sdk";

export const monitoredTask = task({
  id: "monitored-task",
  run: async (payload) => {
    // Get current run cost
    const currentUsage = await usage.getCurrent();
    logger.info("Current cost", {
      costInCents: currentUsage.costInCents,
      durationMs: currentUsage.durationMs,
    });

    // Measure specific operation
    const { result, compute } = await usage.measure(async () => {
      return await expensiveOperation(payload);
    });

    logger.info("Operation cost", {
      costInCents: compute.costInCents,
      durationMs: compute.durationMs,
    });

    return result;
  },
});

Run Management

ts
// Cancel runs
await runs.cancel("run_123");

// Replay runs with same payload
await runs.replay("run_123");

// Retrieve run with cost details
const run = await runs.retrieve("run_123");
console.log(`Cost: ${run.costInCents} cents, Duration: ${run.durationMs}ms`);

Best Practices

  • Concurrency: Use queues to prevent overwhelming external services
  • Retries: Configure exponential backoff for transient failures
  • Idempotency: Always use for payment/critical operations
  • Metadata: Track progress for long-running tasks
  • Machines: Match machine size to computational requirements
  • Tags: Use consistent naming patterns for filtering
  • Large Payloads: Use external storage for files > 10MB
  • Error Handling: Distinguish between retryable and fatal errors

Design tasks to be stateless, idempotent, and resilient to failures. Use metadata for state tracking and queues for resource management.