Back to Mastra

Inngest Workflow with Observability

examples/inngest/README.md

2025-12-1817.5 KB
Original Source

Inngest Workflow with Observability

This example demonstrates how to build an Inngest workflow with Mastra, including observability/tracing to capture workflow execution details.

Observability

This example includes Mastra's observability features to trace workflow execution. When you run the workflow, you'll see trace events in the console:

🚀 SPAN_STARTED
   Type: workflow_run
   Name: activity-planning-workflow-step2-if-else
   ID: <span-id>
   Trace ID: <trace-id>
────────────────────────────────────────────────────────────────────────────────
🚀 SPAN_STARTED
   Type: workflow_step
   Name: fetch-weather
   ...
────────────────────────────────────────────────────────────────────────────────
✅ SPAN_ENDED
   Type: workflow_step
   Name: fetch-weather
   Duration: 1234ms
   Output: { "date": "...", "maxTemp": 25, ... }

This proves that Mastra's observability captures:

  • Workflow execution (workflow_run spans)
  • Individual step execution (workflow_step spans)
  • Agent/model calls (agent_run, model_generation spans)
  • Step inputs and outputs
  • Timing information

See the Observability Configuration section for details on configuring exporters.

Setup

sh
npm install @mastra/inngest inngest @mastra/core @mastra/deployer @hono/node-server

docker run --rm -p 8288:8288 \
  inngest/inngest \
  inngest dev -u http://host.docker.internal:3000/inngest/api

Alternatively, you can use the Inngest CLI for local development by following the official Inngest Dev Server guide.

Define the Planning Agent

Define a planning agent which leverages an LLM call to plan activities given a location and corresponding weather conditions.

ts
// agents/planning-agent.ts
import { Agent } from '@mastra/core/agent';

// Create a new planning agent that uses the OpenAI model
const planningAgent = new Agent({
  id: 'planning-agent',
  name: 'planningAgent',
  model: 'openai/gpt-5.1',
  instructions: `
        You are a local activities and travel expert who excels at weather-based planning. Analyze the weather data and provide practical activity recommendations.

        📅 [Day, Month Date, Year]
        ═══════════════════════════

        🌡️ WEATHER SUMMARY
        • Conditions: [brief description]
        • Temperature: [X°C/Y°F to A°C/B°F]
        • Precipitation: [X% chance]

        🌅 MORNING ACTIVITIES
        Outdoor:
        • [Activity Name] - [Brief description including specific location/route]
          Best timing: [specific time range]
          Note: [relevant weather consideration]

        🌞 AFTERNOON ACTIVITIES
        Outdoor:
        • [Activity Name] - [Brief description including specific location/route]
          Best timing: [specific time range]
          Note: [relevant weather consideration]

        🏠 INDOOR ALTERNATIVES
        • [Activity Name] - [Brief description including specific venue]
          Ideal for: [weather condition that would trigger this alternative]

        ⚠️ SPECIAL CONSIDERATIONS
        • [Any relevant weather warnings, UV index, wind conditions, etc.]

        Guidelines:
        - Suggest 2-3 time-specific outdoor activities per day
        - Include 1-2 indoor backup options
        - For precipitation >50%, lead with indoor activities
        - All activities must be specific to the location
        - Include specific venues, trails, or locations
        - Consider activity intensity based on temperature
        - Keep descriptions concise but informative

        Maintain this exact formatting for consistency, using the emoji and section headers as shown.
      `,
});

export { planningAgent };

Define the Activity Planner Workflow

Define the activity planner workflow with 3 steps: one to fetch the weather via a network call, one to plan activities, and another to plan only indoor activities.

ts
// workflows/inngest-workflow.ts
import { init } from '@mastra/inngest';
import { Inngest } from 'inngest';
import { z } from 'zod';

const { createWorkflow, createStep } = init(
  new Inngest({
    id: 'mastra',
    baseUrl: `http://localhost:8288`,
  }),
);

// Helper function to convert weather codes to human-readable descriptions
function getWeatherCondition(code: number): string {
  const conditions: Record<number, string> = {
    0: 'Clear sky',
    1: 'Mainly clear',
    2: 'Partly cloudy',
    3: 'Overcast',
    45: 'Foggy',
    48: 'Depositing rime fog',
    51: 'Light drizzle',
    53: 'Moderate drizzle',
    55: 'Dense drizzle',
    61: 'Slight rain',
    63: 'Moderate rain',
    65: 'Heavy rain',
    71: 'Slight snow fall',
    73: 'Moderate snow fall',
    75: 'Heavy snow fall',
    95: 'Thunderstorm',
  };
  return conditions[code] || 'Unknown';
}

const forecastSchema = z.object({
  date: z.string(),
  maxTemp: z.number(),
  minTemp: z.number(),
  precipitationChance: z.number(),
  condition: z.string(),
  location: z.string(),
});

Step 1: Fetch weather data for a given city

ts
// workflows/inngest-workflow.ts
const fetchWeather = createStep({
  id: 'fetch-weather',
  description: 'Fetches weather forecast for a given city',
  inputSchema: z.object({
    city: z.string(),
  }),
  outputSchema: forecastSchema,
  execute: async ({ inputData }) => {
    if (!inputData) {
      throw new Error('Trigger data not found');
    }

    // Get latitude and longitude for the city
    const geocodingUrl = `https://geocoding-api.open-meteo.com/v1/search?name=${encodeURIComponent(inputData.city)}&count=1`;
    const geocodingResponse = await fetch(geocodingUrl);
    const geocodingData = (await geocodingResponse.json()) as {
      results: { latitude: number; longitude: number; name: string }[];
    };

    if (!geocodingData.results?.[0]) {
      throw new Error(`Location '${inputData.city}' not found`);
    }

    const { latitude, longitude, name } = geocodingData.results[0];

    // Fetch weather data using the coordinates
    const weatherUrl = `https://api.open-meteo.com/v1/forecast?latitude=${latitude}&longitude=${longitude}&current=precipitation,weathercode&timezone=auto,&hourly=precipitation_probability,temperature_2m`;
    const response = await fetch(weatherUrl);
    const data = (await response.json()) as {
      current: {
        time: string;
        precipitation: number;
        weathercode: number;
      };
      hourly: {
        precipitation_probability: number[];
        temperature_2m: number[];
      };
    };

    const forecast = {
      date: new Date().toISOString(),
      maxTemp: Math.max(...data.hourly.temperature_2m),
      minTemp: Math.min(...data.hourly.temperature_2m),
      condition: getWeatherCondition(data.current.weathercode),
      location: name,
      precipitationChance: data.hourly.precipitation_probability.reduce((acc, curr) => Math.max(acc, curr), 0),
    };

    return forecast;
  },
});

Step 2: Suggest activities (indoor or outdoor) based on weather

ts
// workflows/inngest-workflow.ts
const planActivities = createStep({
  id: 'plan-activities',
  description: 'Suggests activities based on weather conditions',
  inputSchema: forecastSchema,
  outputSchema: z.object({
    activities: z.string(),
  }),
  execute: async ({ inputData, mastra }) => {
    const forecast = inputData;

    if (!forecast) {
      throw new Error('Forecast data not found');
    }

    const prompt = `Based on the following weather forecast for ${forecast.location}, suggest appropriate activities:
      ${JSON.stringify(forecast, null, 2)}
      `;

    const agent = mastra?.getAgent('planningAgent');
    if (!agent) {
      throw new Error('Planning agent not found');
    }

    const response = await agent.stream([
      {
        role: 'user',
        content: prompt,
      },
    ]);

    let activitiesText = '';

    for await (const chunk of response.textStream) {
      process.stdout.write(chunk);
      activitiesText += chunk;
    }

    return {
      activities: activitiesText,
    };
  },
});

Step 3: Suggest indoor activities only (for rainy weather)

ts
// workflows/inngest-workflow.ts
const planIndoorActivities = createStep({
  id: 'plan-indoor-activities',
  description: 'Suggests indoor activities based on weather conditions',
  inputSchema: forecastSchema,
  outputSchema: z.object({
    activities: z.string(),
  }),
  execute: async ({ inputData, mastra }) => {
    const forecast = inputData;

    if (!forecast) {
      throw new Error('Forecast data not found');
    }

    const prompt = `In case it rains, plan indoor activities for ${forecast.location} on ${forecast.date}`;

    const agent = mastra?.getAgent('planningAgent');
    if (!agent) {
      throw new Error('Planning agent not found');
    }

    const response = await agent.stream([
      {
        role: 'user',
        content: prompt,
      },
    ]);

    let activitiesText = '';

    for await (const chunk of response.textStream) {
      process.stdout.write(chunk);
      activitiesText += chunk;
    }

    return {
      activities: activitiesText,
    };
  },
});

Define the activity planner workflow

ts
// workflows/inngest-workflow.ts
const activityPlanningWorkflow = createWorkflow({
  id: 'activity-planning-workflow-step2-if-else',
  inputSchema: z.object({
    city: z.string().describe('The city to get the weather for'),
  }),
  outputSchema: z.object({
    activities: z.string(),
  }),
})
  .then(fetchWeather)
  .branch([
    [
      // If precipitation chance is greater than 50%, suggest indoor activities
      async ({ inputData }) => {
        return inputData?.precipitationChance > 50;
      },
      planIndoorActivities,
    ],
    [
      // Otherwise, suggest a mix of activities
      async ({ inputData }) => {
        return inputData?.precipitationChance <= 50;
      },
      planActivities,
    ],
  ]);

activityPlanningWorkflow.commit();

export { activityPlanningWorkflow };

Register Agent and Workflow instances with Mastra class

Register the agents and workflow with the mastra instance. This allows access to the agents within the workflow.

ts
// index.ts
import { Mastra } from '@mastra/core';
import { serve as inngestServe } from '@mastra/inngest';
import { PinoLogger } from '@mastra/loggers';
import { Inngest } from 'inngest';
import { activityPlanningWorkflow } from './workflows/inngest-workflow';
import { planningAgent } from './agents/planning-agent';
import { realtimeMiddleware } from '@inngest/realtime/middleware';

// Create an Inngest instance for workflow orchestration and event handling
const inngest = new Inngest({
  id: 'mastra',
  baseUrl: `http://localhost:8288`, // URL of your local Inngest server
  isDev: true,
  middleware: [realtimeMiddleware()], // Enable real-time updates in the Inngest dashboard
});

// Create and configure the main Mastra instance
export const mastra = new Mastra({
  workflows: {
    activityPlanningWorkflow,
  },
  agents: {
    planningAgent,
  },
  server: {
    host: '0.0.0.0',
    apiRoutes: [
      {
        path: '/api/inngest', // API endpoint for Inngest to send events to
        method: 'ALL',
        createHandler: async ({ mastra }) => inngestServe({ mastra, inngest }),
      },
    ],
  },
  logger: new PinoLogger({
    name: 'Mastra',
    level: 'info',
  }),
});

Execute the activity planner workflow

Here, we'll get the activity planner workflow from the mastra instance, then create a run and execute the created run with the required inputData.

ts
// exec.ts
import { mastra } from './';
import { serve } from '@hono/node-server';
import { createHonoServer, getToolExports } from '@mastra/deployer/server';
import { tools } from '#tools';

const app = await createHonoServer(mastra, {
  tools: getToolExports(tools),
});

// Start the server on port 3000 so Inngest can send events to it
const srv = serve({
  fetch: app.fetch,
  port: 3000,
});

const workflow = mastra.getWorkflow('activityPlanningWorkflow');
const run = await workflow.createRun();

// Start the workflow with the required input data (city name)
// This will trigger the workflow steps and stream the result to the console
const result = await run.start({ inputData: { city: 'New York' } });
console.dir(result, { depth: null });

// Close the server after the workflow run is complete
srv.close();

After running the workflow, you can view and monitor your workflow runs in real time using the Inngest dashboard at http://localhost:8288.

Inngest Flow Control Configuration

Inngest workflows support advanced flow control features including concurrency limits, rate limiting, throttling, debouncing, and priority queuing. These features help manage workflow execution at scale and prevent resource overload.

Concurrency Control

Control how many workflow instances can run simultaneously:

ts
const workflow = createWorkflow({
  id: 'user-processing-workflow',
  inputSchema: z.object({ userId: z.string() }),
  outputSchema: z.object({ result: z.string() }),
  steps: [processUserStep],
  // Limit to 10 concurrent executions, scoped by user ID
  concurrency: {
    limit: 10,
    key: 'event.data.userId', // Per-user concurrency
  },
});

Rate Limiting

Limit the number of workflow executions within a time period:

ts
const workflow = createWorkflow({
  id: 'api-sync-workflow',
  inputSchema: z.object({ endpoint: z.string() }),
  outputSchema: z.object({ status: z.string() }),
  steps: [apiSyncStep],
  // Maximum 1000 executions per hour
  rateLimit: {
    period: '1h',
    limit: 1000,
  },
});

Throttling

Ensure minimum time between workflow executions:

ts
const workflow = createWorkflow({
  id: 'email-notification-workflow',
  inputSchema: z.object({ organizationId: z.string(), message: z.string() }),
  outputSchema: z.object({ sent: z.boolean() }),
  steps: [sendEmailStep],
  // Only one execution per 10 seconds per organization
  throttle: {
    period: '10s',
    limit: 1,
    key: 'event.data.organizationId',
  },
});

Debouncing

Delay execution until no new events arrive within a time window:

ts
const workflow = createWorkflow({
  id: 'search-index-workflow',
  inputSchema: z.object({ documentId: z.string() }),
  outputSchema: z.object({ indexed: z.boolean() }),
  steps: [indexDocumentStep],
  // Wait 5 seconds of no updates before indexing
  debounce: {
    period: '5s',
    key: 'event.data.documentId',
  },
});

Priority Queuing

Set execution priority for workflows:

ts
const workflow = createWorkflow({
  id: 'order-processing-workflow',
  inputSchema: z.object({
    orderId: z.string(),
    priority: z.number().optional(),
  }),
  outputSchema: z.object({ processed: z.boolean() }),
  steps: [processOrderStep],
  // Higher priority orders execute first
  priority: {
    run: 'event.data.priority ?? 50', // Dynamic priority, default 50
  },
});

Combined Flow Control

You can combine multiple flow control features:

ts
const workflow = createWorkflow({
  id: 'comprehensive-workflow',
  inputSchema: z.object({
    userId: z.string(),
    organizationId: z.string(),
    priority: z.number().optional(),
  }),
  outputSchema: z.object({ result: z.string() }),
  steps: [comprehensiveStep],
  // Multiple flow control features
  concurrency: {
    limit: 5,
    key: 'event.data.userId',
  },
  rateLimit: {
    period: '1m',
    limit: 100,
  },
  throttle: {
    period: '10s',
    limit: 1,
    key: 'event.data.organizationId',
  },
  priority: {
    run: 'event.data.priority ?? 0',
  },
});

All flow control features are optional. If not specified, workflows run with Inngest's default behavior. Flow control configuration is validated by Inngest's native implementation, ensuring compatibility and correctness.

For detailed information about flow control options and their behavior, see the Inngest Flow Control documentation.

Observability Configuration

This example uses @mastra/observability to trace workflow execution. The configuration is in index.ts:

ts
import { Observability, ConsoleExporter, DefaultExporter } from '@mastra/observability';

const observability = new Observability({
  configs: {
    default: {
      serviceName: 'inngest-workflow-example',
      sampling: { type: 'always' }, // Sample all traces
      exporters: [
        new ConsoleExporter(), // Logs traces to console
        new DefaultExporter(), // Persists traces to storage
      ],
    },
  },
});

export const mastra = new Mastra({
  // ... other config
  observability,
});

Using Production Exporters

For production, you can use other exporters:

Langfuse:

ts
import { LangfuseExporter } from '@mastra/langfuse';

new LangfuseExporter({
  publicKey: process.env.LANGFUSE_PUBLIC_KEY,
  secretKey: process.env.LANGFUSE_SECRET_KEY,
});

Datadog:

ts
import { DatadogExporter } from '@mastra/datadog';

new DatadogExporter({
  mlApp: 'my-app',
  apiKey: process.env.DD_API_KEY,
});

OpenTelemetry:

ts
import { OtelExporter } from '@mastra/otel-exporter';

new OtelExporter({
  provider: {
    signoz: {
      endpoint: 'https://ingest.signoz.io',
      apiKey: process.env.SIGNOZ_API_KEY,
    },
  },
});

For more information, see the Mastra Observability documentation.