Back to Crewai

Checkpointing

docs/v1.14.1/en/concepts/checkpointing.mdx

1.14.8a16.3 KB
Original Source
<Warning> Checkpointing is in early release. APIs may change in future versions. </Warning>

Overview

Checkpointing automatically saves execution state during a run. If a crew, flow, or agent fails mid-execution, you can restore from the last checkpoint and resume without re-running completed work.

Quick Start

python
from crewai import Crew, CheckpointConfig

crew = Crew(
    agents=[...],
    tasks=[...],
    checkpoint=True,  # uses defaults: ./.checkpoints, on task_completed
)
result = crew.kickoff()

Checkpoint files are written to ./.checkpoints/ after each completed task.

Configuration

Use CheckpointConfig for full control:

python
from crewai import Crew, CheckpointConfig

crew = Crew(
    agents=[...],
    tasks=[...],
    checkpoint=CheckpointConfig(
        location="./my_checkpoints",
        on_events=["task_completed", "crew_kickoff_completed"],
        max_checkpoints=5,
    ),
)

CheckpointConfig Fields

FieldTypeDefaultDescription
locationstr"./.checkpoints"Storage destination — a directory for JsonProvider, a database file path for SqliteProvider
on_eventslist[str]["task_completed"]Event types that trigger a checkpoint
providerBaseProviderJsonProvider()Storage backend
max_checkpointsint | NoneNoneMax checkpoints to keep. Oldest are pruned after each write. Pruning is handled by the provider.

Inheritance and Opt-Out

The checkpoint field on Crew, Flow, and Agent accepts CheckpointConfig, True, False, or None:

ValueBehavior
None (default)Inherit from parent. An agent inherits its crew's config.
TrueEnable with defaults.
FalseExplicit opt-out. Stops inheritance from parent.
CheckpointConfig(...)Custom configuration.
python
crew = Crew(
    agents=[
        Agent(role="Researcher", ...),                  # inherits crew's checkpoint
        Agent(role="Writer", ..., checkpoint=False),     # opted out, no checkpoints
    ],
    tasks=[...],
    checkpoint=True,
)

Resuming from a Checkpoint

python
# Restore and resume
crew = Crew.from_checkpoint("./my_checkpoints/20260407T120000_abc123.json")
result = crew.kickoff()  # picks up from last completed task

The restored crew skips already-completed tasks and resumes from the first incomplete one.

Works on Crew, Flow, and Agent

Crew

python
crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, write_task, review_task],
    checkpoint=CheckpointConfig(location="./crew_cp"),
)

Default trigger: task_completed (one checkpoint per finished task).

Flow

python
from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig

class MyFlow(Flow):
    @start()
    def step_one(self):
        return "data"

    @listen(step_one)
    def step_two(self, data):
        return process(data)

flow = MyFlow(
    checkpoint=CheckpointConfig(
        location="./flow_cp",
        on_events=["method_execution_finished"],
    ),
)
result = flow.kickoff()

# Resume
flow = MyFlow.from_checkpoint("./flow_cp/20260407T120000_abc123.json")
result = flow.kickoff()

Agent

python
agent = Agent(
    role="Researcher",
    goal="Research topics",
    backstory="Expert researcher",
    checkpoint=CheckpointConfig(
        location="./agent_cp",
        on_events=["lite_agent_execution_completed"],
    ),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])

Storage Providers

CrewAI ships with two checkpoint storage providers.

JsonProvider (default)

Writes each checkpoint as a separate JSON file. Simple, human-readable, easy to inspect.

python
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider

crew = Crew(
    agents=[...],
    tasks=[...],
    checkpoint=CheckpointConfig(
        location="./my_checkpoints",
        provider=JsonProvider(),       # this is the default
        max_checkpoints=5,             # prunes oldest files
    ),
)

Files are named <timestamp>_<uuid>.json inside the location directory.

SqliteProvider

Stores all checkpoints in a single SQLite database file. Better for high-frequency checkpointing and avoids many small files.

python
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider

crew = Crew(
    agents=[...],
    tasks=[...],
    checkpoint=CheckpointConfig(
        location="./.checkpoints.db",
        provider=SqliteProvider(),
        max_checkpoints=50,
    ),
)

WAL journal mode is enabled for concurrent read access.

Event Types

The on_events field accepts any combination of event type strings. Common choices:

Use CaseEvents
After each task (Crew)["task_completed"]
After each flow method["method_execution_finished"]
After agent execution["agent_execution_completed"], ["lite_agent_execution_completed"]
On crew completion only["crew_kickoff_completed"]
After every LLM call["llm_call_completed"]
On everything["*"]
<Warning> Using `["*"]` or high-frequency events like `llm_call_completed` will write many checkpoint files and may impact performance. Use `max_checkpoints` to limit disk usage. </Warning>

Manual Checkpointing

For full control, register your own event handler and call state.checkpoint() directly:

python
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent

# Sync handler
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
    path = state.checkpoint("./my_checkpoints")
    print(f"Saved checkpoint: {path}")

# Async handler
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
    path = await state.acheckpoint("./my_checkpoints")
    print(f"Saved checkpoint: {path}")

The state argument is the RuntimeState passed automatically by the event bus when your handler accepts 3 parameters. You can register handlers on any event type listed in the Event Listeners documentation.

Checkpointing is best-effort: if a checkpoint write fails, the error is logged but execution continues uninterrupted.