docs/en/concepts/crews.mdx
A crew in crewAI represents a collaborative group of agents working together to achieve a set of tasks. Each crew defines the strategy for task execution, agent collaboration, and the overall workflow.
| Attribute | Parameters | Description |
|---|---|---|
| Tasks | tasks | A list of tasks assigned to the crew. |
| Agents | agents | A list of agents that are part of the crew. |
| Process (optional) | process | The process flow (e.g., sequential, hierarchical) the crew follows. Default is sequential. |
| Verbose (optional) | verbose | The verbosity level for logging during execution. Defaults to False. |
| Manager LLM (optional) | manager_llm | The language model used by the manager agent in a hierarchical process. Required when using a hierarchical process. |
| Function Calling LLM (optional) | function_calling_llm | If passed, the crew will use this LLM to do function calling for tools for all agents in the crew. Each agent can have its own LLM, which overrides the crew's LLM for function calling. |
| Config (optional) | config | Optional configuration settings for the crew, in Json or Dict[str, Any] format. |
| Max RPM (optional) | max_rpm | Maximum requests per minute the crew adheres to during execution. Defaults to None. |
| Memory (optional) | memory | Utilized for storing execution memories (short-term, long-term, entity memory). |
| Cache (optional) | cache | Specifies whether to use a cache for storing the results of tools' execution. Defaults to True. |
| Embedder (optional) | embedder | Configuration for the embedder to be used by the crew. Mostly used by memory for now. Default is {"provider": "openai"}. |
| Step Callback (optional) | step_callback | A function that is called after each step of every agent. This can be used to log the agent's actions or to perform other operations; it won't override the agent-specific step_callback. |
| Task Callback (optional) | task_callback | A function that is called after the completion of each task. Useful for monitoring or additional operations post-task execution. |
| Share Crew (optional) | share_crew | Whether you want to share the complete crew information and execution with the crewAI team to make the library better, and allow us to train models. |
| Output Log File (optional) | output_log_file | Set to True to save logs as logs.txt in the current directory or provide a file path. Logs will be in JSON format if the filename ends in .json, otherwise .txt. Defaults to None. |
| Manager Agent (optional) | manager_agent | manager sets a custom agent that will be used as a manager. |
| Prompt File (optional) | prompt_file | Path to the prompt JSON file to be used for the crew. |
| Planning (optional) | planning | Adds planning ability to the Crew. When activated before each Crew iteration, all Crew data is sent to an AgentPlanner that will plan the tasks and this plan will be added to each task description. |
| Planning LLM (optional) | planning_llm | The language model used by the AgentPlanner in a planning process. |
| Knowledge Sources (optional) | knowledge_sources | Knowledge sources available at the crew level, accessible to all the agents. |
| Stream (optional) | stream | Enable streaming output to receive real-time updates during crew execution. Returns a CrewStreamingOutput object that can be iterated for chunks. Defaults to False. |
| Chat LLM (optional) | chat_llm | The language model used to orchestrate crewai chat CLI interactions with the crew. Accepts a model name string or LLM instance. Defaults to None. |
| Before Kickoff Callbacks (optional) | before_kickoff_callbacks | A list of callable functions executed before the crew starts. Each callback receives and can modify the inputs dict. Distinct from the @before_kickoff decorator. Defaults to []. |
| After Kickoff Callbacks (optional) | after_kickoff_callbacks | A list of callable functions executed after the crew finishes. Each callback receives and can modify the CrewOutput. Distinct from the @after_kickoff decorator. Defaults to []. |
| Tracing (optional) | tracing | Controls OpenTelemetry tracing for the crew. True = always enable, False = always disable, None = inherit from environment / user settings. Defaults to None. |
| Skills (optional) | skills | A list of Path objects (skill search directories) or pre-loaded Skill objects applied to all agents in the crew. Defaults to None. |
| Security Config (optional) | security_config | A SecurityConfig instance managing crew fingerprinting and identity. Defaults to SecurityConfig(). |
| Checkpoint (optional) | checkpoint | Enables automatic checkpointing. Pass True for sensible defaults, a CheckpointConfig for full control, False to opt out, or None to inherit. See the Checkpointing section below. Defaults to None. |
There are two ways to create crews in CrewAI: using YAML configuration (recommended) or defining them directly in code.
Using YAML configuration provides a cleaner, more maintainable way to define crews and is consistent with how agents and tasks are defined in CrewAI projects.
After creating your CrewAI project as outlined in the Installation section, you can define your crew in a class that inherits from CrewBase and uses decorators to define agents, tasks, and the crew itself.
from crewai import Agent, Crew, Task, Process
from crewai.project import CrewBase, agent, task, crew, before_kickoff, after_kickoff
from crewai.agents.agent_builder.base_agent import BaseAgent
from typing import List
@CrewBase
class YourCrewName:
"""Description of your crew"""
agents: List[BaseAgent]
tasks: List[Task]
# Paths to your YAML configuration files
# To see an example agent and task defined in YAML, checkout the following:
# - Task: https://docs.crewai.com/concepts/tasks#yaml-configuration-recommended
# - Agents: https://docs.crewai.com/concepts/agents#yaml-configuration-recommended
agents_config = 'config/agents.yaml'
tasks_config = 'config/tasks.yaml'
@before_kickoff
def prepare_inputs(self, inputs):
# Modify inputs before the crew starts
inputs['additional_data'] = "Some extra information"
return inputs
@after_kickoff
def process_output(self, output):
# Modify output after the crew finishes
output.raw += "\nProcessed after kickoff."
return output
@agent
def agent_one(self) -> Agent:
return Agent(
config=self.agents_config['agent_one'], # type: ignore[index]
verbose=True
)
@agent
def agent_two(self) -> Agent:
return Agent(
config=self.agents_config['agent_two'], # type: ignore[index]
verbose=True
)
@task
def task_one(self) -> Task:
return Task(
config=self.tasks_config['task_one'] # type: ignore[index]
)
@task
def task_two(self) -> Task:
return Task(
config=self.tasks_config['task_two'] # type: ignore[index]
)
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents, # Automatically collected by the @agent decorator
tasks=self.tasks, # Automatically collected by the @task decorator.
process=Process.sequential,
verbose=True,
)
How to run the above code:
YourCrewName().crew().kickoff(inputs={"any": "input here"})
The CrewBase class, along with these decorators, automates the collection of agents and tasks, reducing the need for manual management.
annotations.pyCrewAI provides several decorators in the annotations.py file that are used to mark methods within your crew class for special handling:
@CrewBase: Marks the class as a crew base class.@agent: Denotes a method that returns an Agent object.@task: Denotes a method that returns a Task object.@crew: Denotes the method that returns the Crew object.@before_kickoff: (Optional) Marks a method to be executed before the crew starts.@after_kickoff: (Optional) Marks a method to be executed after the crew finishes.These decorators help in organizing your crew's structure and automatically collecting agents and tasks without manually listing them.
Alternatively, you can define the crew directly in code without using YAML configuration files.
from crewai import Agent, Crew, Task, Process
from crewai_tools import YourCustomTool
class YourCrewName:
def agent_one(self) -> Agent:
return Agent(
role="Data Analyst",
goal="Analyze data trends in the market",
backstory="An experienced data analyst with a background in economics",
verbose=True,
tools=[YourCustomTool()]
)
def agent_two(self) -> Agent:
return Agent(
role="Market Researcher",
goal="Gather information on market dynamics",
backstory="A diligent researcher with a keen eye for detail",
verbose=True
)
def task_one(self) -> Task:
return Task(
description="Collect recent market data and identify trends.",
expected_output="A report summarizing key trends in the market.",
agent=self.agent_one()
)
def task_two(self) -> Task:
return Task(
description="Research factors affecting market dynamics.",
expected_output="An analysis of factors influencing the market.",
agent=self.agent_two()
)
def crew(self) -> Crew:
return Crew(
agents=[self.agent_one(), self.agent_two()],
tasks=[self.task_one(), self.task_two()],
process=Process.sequential,
verbose=True
)
How to run the above code:
YourCrewName().crew().kickoff(inputs={})
In this example:
The output of a crew in the CrewAI framework is encapsulated within the CrewOutput class.
This class provides a structured way to access results of the crew's execution, including various formats such as raw strings, JSON, and Pydantic models.
The CrewOutput includes the results from the final task output, token usage, and individual task outputs.
| Attribute | Parameters | Type | Description |
|---|---|---|---|
| Raw | raw | str | The raw output of the crew. This is the default format for the output. |
| Pydantic | pydantic | Optional[BaseModel] | A Pydantic model object representing the structured output of the crew. |
| JSON Dict | json_dict | Optional[Dict[str, Any]] | A dictionary representing the JSON output of the crew. |
| Tasks Output | tasks_output | List[TaskOutput] | A list of TaskOutput objects, each representing the output of a task in the crew. |
| Token Usage | token_usage | Dict[str, Any] | A summary of token usage, providing insights into the language model's performance during execution. |
| Method/Property | Description |
|---|---|
| json | Returns the JSON string representation of the crew output if the output format is JSON. |
| to_dict | Converts the JSON and Pydantic outputs to a dictionary. |
| **str** | Returns the string representation of the crew output, prioritizing Pydantic, then JSON, then raw. |
Once a crew has been executed, its output can be accessed through the output attribute of the Crew object. The CrewOutput class provides various ways to interact with and present this output.
# Example crew execution
crew = Crew(
agents=[research_agent, writer_agent],
tasks=[research_task, write_article_task],
verbose=True
)
crew_output = crew.kickoff()
# Accessing the crew output
print(f"Raw Output: {crew_output.raw}")
if crew_output.json_dict:
print(f"JSON Output: {json.dumps(crew_output.json_dict, indent=2)}")
if crew_output.pydantic:
print(f"Pydantic Output: {crew_output.pydantic}")
print(f"Tasks Output: {crew_output.tasks_output}")
print(f"Token Usage: {crew_output.token_usage}")
You can see real time log of the crew execution, by setting output_log_file as a True(Boolean) or a file_name(str). Supports logging of events as both file_name.txt and file_name.json.
In case of True(Boolean) will save as logs.txt.
In case of output_log_file is set as False(Boolean) or None, the logs will not be populated.
# Save crew logs
crew = Crew(output_log_file = True) # Logs will be saved as logs.txt
crew = Crew(output_log_file = file_name) # Logs will be saved as file_name.txt
crew = Crew(output_log_file = file_name.txt) # Logs will be saved as file_name.txt
crew = Crew(output_log_file = file_name.json) # Logs will be saved as file_name.json
Checkpointing lets a crew automatically save its state after key events (e.g. task completion) so that long-running or interrupted runs can be resumed exactly where they left off without re-executing completed tasks.
Pass checkpoint=True to enable checkpointing with sensible defaults (saves to .checkpoints/ after every task):
from crewai import Crew, Process
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
process=Process.sequential,
checkpoint=True, # saves to .checkpoints/ after every task
)
crew.kickoff(inputs={"topic": "AI trends"})
CheckpointConfigUse CheckpointConfig for fine-grained control over location, trigger events, storage backend, and retention:
from crewai import Crew, Process
from crewai.state.checkpoint_config import CheckpointConfig
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
process=Process.sequential,
checkpoint=CheckpointConfig(
location="./.checkpoints", # directory for JSON files (default)
on_events=["task_completed"], # trigger after each task (default)
max_checkpoints=5, # keep only the 5 most recent checkpoints
),
)
crew.kickoff(inputs={"topic": "AI trends"})
Use Crew.from_checkpoint() to restore a crew from a saved checkpoint file, then call kickoff() to resume:
# Resume from the most recent checkpoint
crew = Crew.from_checkpoint(".checkpoints/latest.json")
crew.kickoff()
CheckpointConfig Attributes| Attribute | Type | Default | Description |
|---|---|---|---|
location | str | "./.checkpoints" | Storage destination. For JsonProvider this is a directory path; for SqliteProvider a database file path. |
on_events | list[str] | ["task_completed"] | Event types that trigger a checkpoint write. Use ["*"] to checkpoint on every event. |
provider | JsonProvider | SqliteProvider | JsonProvider() | Storage backend. Defaults to JsonProvider (plain JSON files). |
max_checkpoints | int | None | None | Maximum checkpoints to keep. Oldest are pruned after each write. None keeps all. |
Crews can utilize memory (short-term, long-term, and entity memory) to enhance their execution and learning over time. This feature allows crews to store and recall execution memories, aiding in decision-making and task execution strategies.
Caches can be employed to store the results of tools' execution, making the process more efficient by reducing the need to re-execute identical tasks.
After the crew execution, you can access the usage_metrics attribute to view the language model (LLM) usage metrics for all tasks executed by the crew. This provides insights into operational efficiency and areas for improvement.
# Access the crew's usage metrics
crew = Crew(agents=[agent1, agent2], tasks=[task1, task2])
crew.kickoff()
print(crew.usage_metrics)
manager_llm or manager_agent is required for this process and it's essential for validating the process flow.Once your crew is assembled, initiate the workflow with the kickoff() method. This starts the execution process according to the defined process flow.
# Start the crew's task execution
result = my_crew.kickoff()
print(result)
Once your crew is assembled, initiate the workflow with the appropriate kickoff method. CrewAI provides several methods for better control over the kickoff process.
kickoff(): Starts the execution process according to the defined process flow.kickoff_for_each(): Executes tasks sequentially for each provided input event or item in the collection.CrewAI offers two approaches for async execution:
| Method | Type | Description |
|---|---|---|
akickoff() | Native async | True async/await throughout the entire execution chain |
akickoff_for_each() | Native async | Native async execution for each input in a list |
kickoff_async() | Thread-based | Wraps synchronous execution in asyncio.to_thread |
kickoff_for_each_async() | Thread-based | Thread-based async for each input in a list |
# Start the crew's task execution
result = my_crew.kickoff()
print(result)
# Example of using kickoff_for_each
inputs_array = [{'topic': 'AI in healthcare'}, {'topic': 'AI in finance'}]
results = my_crew.kickoff_for_each(inputs=inputs_array)
for result in results:
print(result)
# Example of using native async with akickoff
inputs = {'topic': 'AI in healthcare'}
async_result = await my_crew.akickoff(inputs=inputs)
print(async_result)
# Example of using native async with akickoff_for_each
inputs_array = [{'topic': 'AI in healthcare'}, {'topic': 'AI in finance'}]
async_results = await my_crew.akickoff_for_each(inputs=inputs_array)
for async_result in async_results:
print(async_result)
# Example of using thread-based kickoff_async
inputs = {'topic': 'AI in healthcare'}
async_result = await my_crew.kickoff_async(inputs=inputs)
print(async_result)
# Example of using thread-based kickoff_for_each_async
inputs_array = [{'topic': 'AI in healthcare'}, {'topic': 'AI in finance'}]
async_results = await my_crew.kickoff_for_each_async(inputs=inputs_array)
for async_result in async_results:
print(async_result)
These methods provide flexibility in how you manage and execute tasks within your crew, allowing for both synchronous and asynchronous workflows tailored to your needs. For detailed async examples, see the Kickoff Crew Asynchronously guide.
For real-time visibility into crew execution, you can enable streaming to receive output as it's generated:
# Enable streaming
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True
)
# Iterate over streaming output
streaming = crew.kickoff(inputs={"topic": "AI"})
for chunk in streaming:
print(chunk.content, end="", flush=True)
# Access final result
result = streaming.result
Learn more about streaming in the Streaming Crew Execution guide.
You can now replay from a specific task using our CLI command replay.
The replay feature in CrewAI allows you to replay from a specific task using the command-line interface (CLI). By running the command crewai replay -t <task_id>, you can specify the task_id for the replay process.
Kickoffs will now save the latest kickoffs returned task outputs locally for you to be able to replay from.
To use the replay feature, follow these steps:
To view the latest kickoff task IDs, use:
crewai log-tasks-outputs
Then, to replay from a specific task, use:
crewai replay -t <task_id>
These commands let you replay from your latest kickoff tasks, still retaining context from previously executed tasks.