Back to Airflow

``LLMOperator``

providers/common/ai/docs/operators/llm.rst

3.3.0b110.2 KB
Original Source

.. Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

.. _howto/operator:llm:

LLMOperator

Use :class:~airflow.providers.common.ai.operators.llm.LLMOperator for general-purpose LLM calls — summarization, extraction, classification, structured output, or any prompt-based task.

The operator sends a prompt to an LLM via :class:~airflow.providers.common.ai.hooks.pydantic_ai.PydanticAIHook and returns the output as XCom.

.. seealso:: :ref:Connection configuration <howto/connection:pydanticai>

Basic Usage

Provide a prompt and the operator returns the LLM's response as a string:

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py :language: python :start-after: [START howto_operator_llm_basic] :end-before: [END howto_operator_llm_basic]

Structured Output

Set output_type to a Pydantic BaseModel subclass. The LLM is instructed to return structured data, and the model instance is pushed to XCom unchanged so downstream tasks can type-hint the class directly (def downstream(result: MyModel)) and use attribute access (result.field).

The declared output_type (and any BaseModel reachable from Union/Optional/list shapes) is registered for XCom deserialization by the worker when it loads the DAG, before any task runs -- so no edit to [core] allowed_deserialization_classes is needed. The Pydantic class must be defined at module scope and bound to an attribute matching its __name__; classes nested inside a function or @dag-decorated body, parameterized generics, and dynamically-built classes whose __name__ does not match the attribute they are bound to cannot be re-imported, so they are skipped with a warning at worker startup and the value fails to deserialize at the consumer.

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py :language: python :start-after: [START howto_operator_llm_structured_output_class] :end-before: [END howto_operator_llm_structured_output_class]

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py :language: python :start-after: [START howto_operator_llm_structured] :end-before: [END howto_operator_llm_structured]

Registration covers downstream tasks in the same DAG: every worker walks the loaded DAG's tasks at startup and registers each declared class, so it also works for mapped producers (.expand(...)) and for workers that load DAGs from a cache that bypasses operator construction.

The Airflow UI's XCom viewer renders Pydantic instances via the stringify path, which produces a representation like my_module.MyModel@version=1(field=value,...) without consulting the allow-list. It is not pretty (no field-by-field rendering today), but the value shows up; no configuration is required.

The remaining gap is cross-DAG xcom_pull -- a task in a different DAG that pulls this XCom only parses its own DAG file, not the producer's, so the class is not auto-registered. Add the class qualified name to [core] allowed_deserialization_classes (or a glob that matches it) to make that pattern work.

If a downstream consumer needs the dict shape (e.g. forwarding to an external system that expects JSON-style payloads), pass serialize_output=True and the operator calls model_dump() before pushing to XCom. The pre-PR behavior is available on demand without giving up the typed default.

Agent Parameters

Pass additional keyword arguments to the pydantic-ai Agent constructor via agent_params — for example, retries, model_settings, or tools. See the pydantic-ai Agent docs <https://ai.pydantic.dev/api/agent/>__ for the full list of supported parameters.

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py :language: python :start-after: [START howto_operator_llm_agent_params] :end-before: [END howto_operator_llm_agent_params]

Usage Limits

Set usage_limits to a pydantic-ai UsageLimits <https://ai.pydantic.dev/api/usage/#pydantic_ai.usage.UsageLimits>__ to fail the task when the run exceeds a configured budget — request count, input/output tokens, or tool calls. The check happens inside pydantic-ai's run loop, so the limit applies even when retries triggers multiple model calls within a single task.

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py :language: python :start-after: [START howto_operator_llm_usage_limits] :end-before: [END howto_operator_llm_usage_limits]

Common knobs on UsageLimits:

  • request_limit — max model requests per run (caps retry/tool-loop blow-ups). pydantic-ai applies a default of 50 when UsageLimits() is constructed without an explicit value, so passing UsageLimits(input_tokens_limit=4_000) silently inherits that 50-request cap. Set request_limit=None to disable it explicitly when you only want a token cap.
  • input_tokens_limit / output_tokens_limit — per-run token caps.
  • total_tokens_limit — combined input + output cap.
  • tool_calls_limit — max tool invocations (AgentOperator only).

When the limit is hit pydantic-ai raises UsageLimitExceeded, which propagates to Airflow as a task failure — Airflow's standard retry policy applies on top.

TaskFlow Decorator

The @task.llm decorator wraps LLMOperator. The function returns the prompt string; all other parameters are passed to the operator:

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py :language: python :start-after: [START howto_decorator_llm] :end-before: [END howto_decorator_llm]

With structured output:

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py :language: python :start-after: [START howto_decorator_llm_structured] :end-before: [END howto_decorator_llm_structured]

Multimodal prompts ^^^^^^^^^^^^^^^^^^

@task.llm accepts the same prompt shape as @task.agent -- the callable may return either a str or a non-empty Sequence[UserContent] (e.g., ["Describe this:", ImageUrl(url="...")]) for vision, audio, or document inputs. See :ref:@task.agent multimodal prompts <howto/operator:agent-multimodal> for the full example. require_approval=True is not currently supported with a Sequence prompt -- the approval session model expects a string -- and will raise at the approval boundary; widening that path is tracked as a follow-up.

Classification with Literal ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Set output_type to a Literal to constrain the LLM to a fixed set of labels — useful for classification tasks:

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm_classification.py :language: python :start-after: [START howto_decorator_llm_classification] :end-before: [END howto_decorator_llm_classification]

Multi-task pipeline with dynamic mapping ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Combine @task.llm with upstream and downstream tasks. Use .expand() to process a list of items in parallel:

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm_analysis_pipeline.py :language: python :start-after: [START howto_decorator_llm_pipeline] :end-before: [END howto_decorator_llm_pipeline]

Human-in-the-Loop Approval

Set require_approval=True to pause the task after the LLM generates its output and wait for a human reviewer to approve or reject it via the Airflow HITL interface. Optionally allow the reviewer to edit the output before approving with allow_modifications=True, and set a deadline with approval_timeout:

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py :language: python :start-after: [START howto_operator_llm_approval] :end-before: [END howto_operator_llm_approval]

Parameters

  • prompt: The prompt to send to the LLM (operator) or the return value of the decorated function (decorator).
  • llm_conn_id: Airflow connection ID for the LLM provider.
  • model_id: Model identifier (e.g. "openai:gpt-5"). Overrides the connection's extra field.
  • system_prompt: System-level instructions for the agent. Supports Jinja templating.
  • output_type: Expected output type (default: str). Set to a Pydantic BaseModel for structured output.
  • agent_params: Additional keyword arguments passed to the pydantic-ai Agent constructor (e.g. retries, model_settings, tools). Supports Jinja templating.
  • usage_limits: Optional pydantic-ai UsageLimits enforced on the run. Fails the task when token / request / tool-call budgets are exceeded. Default None.
  • require_approval: If True, the task defers after generating output and waits for human review. Default False.
  • approval_timeout: Maximum time to wait for a review (timedelta). None means wait indefinitely. Default None.
  • allow_modifications: If True, the reviewer can edit the output before approving. Default False.

Logging

After each LLM call, the operator logs a summary with model name, token usage, and request count at INFO level. At DEBUG level, the LLM output is also logged (truncated to 500 characters). See :ref:AgentOperator — Logging <howto/operator:agent> for details on the log format.