providers/common/ai/docs/operators/llm.rst
.. Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
.. _howto/operator:llm:
LLMOperatorUse :class:~airflow.providers.common.ai.operators.llm.LLMOperator for
general-purpose LLM calls — summarization, extraction, classification,
structured output, or any prompt-based task.
The operator sends a prompt to an LLM via
:class:~airflow.providers.common.ai.hooks.pydantic_ai.PydanticAIHook and
returns the output as XCom.
.. seealso::
:ref:Connection configuration <howto/connection:pydanticai>
Provide a prompt and the operator returns the LLM's response as a string:
.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py :language: python :start-after: [START howto_operator_llm_basic] :end-before: [END howto_operator_llm_basic]
Set output_type to a Pydantic BaseModel subclass. The LLM is instructed
to return structured data, and the model instance is pushed to XCom unchanged
so downstream tasks can type-hint the class directly
(def downstream(result: MyModel)) and use attribute access (result.field).
The declared output_type (and any BaseModel reachable from
Union/Optional/list shapes) is registered for XCom deserialization by
the worker when it loads the DAG, before any task runs -- so no edit to
[core] allowed_deserialization_classes is needed. The Pydantic class must be
defined at module scope and bound to an attribute matching its __name__;
classes nested inside a function or @dag-decorated body, parameterized
generics, and dynamically-built classes whose __name__ does not match the
attribute they are bound to cannot be re-imported, so they are skipped with a
warning at worker startup and the value fails to deserialize at the consumer.
.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py :language: python :start-after: [START howto_operator_llm_structured_output_class] :end-before: [END howto_operator_llm_structured_output_class]
.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py :language: python :start-after: [START howto_operator_llm_structured] :end-before: [END howto_operator_llm_structured]
Registration covers downstream tasks in the same DAG: every worker walks the
loaded DAG's tasks at startup and registers each declared class, so it also works
for mapped producers (.expand(...)) and for workers that load DAGs from a
cache that bypasses operator construction.
The Airflow UI's XCom viewer renders Pydantic instances via the
stringify path, which produces a representation like
my_module.MyModel@version=1(field=value,...) without consulting the
allow-list. It is not pretty (no field-by-field rendering today), but the value
shows up; no configuration is required.
The remaining gap is cross-DAG xcom_pull -- a task in a different DAG
that pulls this XCom only parses its own DAG file, not the producer's, so the
class is not auto-registered. Add the class qualified name to
[core] allowed_deserialization_classes (or a glob that matches it) to make
that pattern work.
If a downstream consumer needs the dict shape (e.g. forwarding to an external
system that expects JSON-style payloads), pass serialize_output=True and the
operator calls model_dump() before pushing to XCom. The pre-PR behavior is
available on demand without giving up the typed default.
Pass additional keyword arguments to the pydantic-ai Agent constructor
via agent_params — for example, retries, model_settings, or tools.
See the pydantic-ai Agent docs <https://ai.pydantic.dev/api/agent/>__ for
the full list of supported parameters.
.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py :language: python :start-after: [START howto_operator_llm_agent_params] :end-before: [END howto_operator_llm_agent_params]
Set usage_limits to a
pydantic-ai UsageLimits <https://ai.pydantic.dev/api/usage/#pydantic_ai.usage.UsageLimits>__
to fail the task when the run exceeds a configured budget — request count,
input/output tokens, or tool calls. The check happens inside pydantic-ai's
run loop, so the limit applies even when retries triggers multiple model
calls within a single task.
.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py :language: python :start-after: [START howto_operator_llm_usage_limits] :end-before: [END howto_operator_llm_usage_limits]
Common knobs on UsageLimits:
request_limit — max model requests per run (caps retry/tool-loop blow-ups).
pydantic-ai applies a default of 50 when UsageLimits() is constructed
without an explicit value, so passing UsageLimits(input_tokens_limit=4_000)
silently inherits that 50-request cap. Set request_limit=None to disable
it explicitly when you only want a token cap.input_tokens_limit / output_tokens_limit — per-run token caps.total_tokens_limit — combined input + output cap.tool_calls_limit — max tool invocations (AgentOperator only).When the limit is hit pydantic-ai raises UsageLimitExceeded, which
propagates to Airflow as a task failure — Airflow's standard retry policy
applies on top.
The @task.llm decorator wraps LLMOperator. The function returns the
prompt string; all other parameters are passed to the operator:
.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py :language: python :start-after: [START howto_decorator_llm] :end-before: [END howto_decorator_llm]
With structured output:
.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py :language: python :start-after: [START howto_decorator_llm_structured] :end-before: [END howto_decorator_llm_structured]
Multimodal prompts ^^^^^^^^^^^^^^^^^^
@task.llm accepts the same prompt shape as @task.agent -- the callable
may return either a str or a non-empty Sequence[UserContent] (e.g.,
["Describe this:", ImageUrl(url="...")]) for vision, audio, or document
inputs. See :ref:@task.agent multimodal prompts <howto/operator:agent-multimodal> for
the full example. require_approval=True is not currently supported with a
Sequence prompt -- the approval session model expects a string -- and will
raise at the approval boundary; widening that path is tracked as a follow-up.
Classification with Literal
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Set output_type to a Literal to constrain the LLM to a fixed set of
labels — useful for classification tasks:
.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm_classification.py :language: python :start-after: [START howto_decorator_llm_classification] :end-before: [END howto_decorator_llm_classification]
Multi-task pipeline with dynamic mapping ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Combine @task.llm with upstream and downstream tasks. Use .expand()
to process a list of items in parallel:
.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm_analysis_pipeline.py :language: python :start-after: [START howto_decorator_llm_pipeline] :end-before: [END howto_decorator_llm_pipeline]
Set require_approval=True to pause the task after the LLM generates its
output and wait for a human reviewer to approve or reject it via the Airflow
HITL interface. Optionally allow the reviewer to edit the output before
approving with allow_modifications=True, and set a deadline with
approval_timeout:
.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py :language: python :start-after: [START howto_operator_llm_approval] :end-before: [END howto_operator_llm_approval]
prompt: The prompt to send to the LLM (operator) or the return value of the
decorated function (decorator).llm_conn_id: Airflow connection ID for the LLM provider.model_id: Model identifier (e.g. "openai:gpt-5"). Overrides the connection's extra field.system_prompt: System-level instructions for the agent. Supports Jinja templating.output_type: Expected output type (default: str). Set to a Pydantic BaseModel
for structured output.agent_params: Additional keyword arguments passed to the pydantic-ai Agent
constructor (e.g. retries, model_settings, tools). Supports Jinja templating.usage_limits: Optional pydantic-ai UsageLimits enforced on the run. Fails
the task when token / request / tool-call budgets are exceeded. Default None.require_approval: If True, the task defers after generating output and waits
for human review. Default False.approval_timeout: Maximum time to wait for a review (timedelta). None
means wait indefinitely. Default None.allow_modifications: If True, the reviewer can edit the output before
approving. Default False.After each LLM call, the operator logs a summary with model name, token usage,
and request count at INFO level. At DEBUG level, the LLM output is also logged
(truncated to 500 characters). See :ref:AgentOperator — Logging <howto/operator:agent>
for details on the log format.