docs/v3/examples/run-api-sourced-etl.mdx
{/*
This page is automatically generated via the generate_example_pages.py script. Any changes to this page will be overwritten.
*/}
<a href="https://github.com/PrefectHQ/prefect/blob/main/examples/run_api_sourced_etl.py" target="_blank">View on GitHub</a>
Prefect turns everyday Python into production-grade workflows with zero boilerplate.
When you pair Prefect with pandas you get a versatile ETL toolkit:
The result? You spend your time thinking about what you want to build, not how to keep it alive. Point this trio at any API, database, or file system and it will move the data where you need it while handling the messy details for you.
In this article you will:
DataFrame.This example demonstrates these Prefect features:
@task – wrap any function in retries & observability.log_prints – surface print() logs automatically.Your data team wants engagement metrics from Dev.to articles, daily. You need a quick, reliable pipeline that anyone can run locally and later schedule in Prefect Cloud.
Write three small Python functions (extract, transform, load), add two decorators, and let Prefect handle retries, concurrency, and logging. No framework-specific hoops, just Python the way you already write it.
For more background on Prefect's design philosophy, check out our blog post: Built to Fail: Design Patterns for Resilient Data Pipelines
Watch as Prefect orchestrates the ETL pipeline with automatic retries and logging. The flow fetches multiple pages of articles, transforms them into a structured DataFrame, and saves the results to CSV. This pattern is highly adaptable - use it to build pipelines that move data between any sources and destinations:
fetch_page task – Downloads a single page with retries.to_dataframe task – Normalises JSON to a pandas DataFrame.save_csv task – Persists the DataFrame and logs a peek.etl flow – Orchestrates the tasks sequentially for clarity.if __name__ == "__main__" with some basic configurations kicks things off.from __future__ import annotations
from pathlib import Path
from typing import Any
import httpx
import pandas as pd
from prefect import flow, task
@task(retries=3, retry_delay_seconds=[2, 5, 15])
def fetch_page(page: int, api_base: str, per_page: int) -> list[dict[str, Any]]:
"""Return a list of article dicts for a given page number."""
url = f"{api_base}/articles"
params = {"page": page, "per_page": per_page}
print(f"Fetching page {page} …")
response = httpx.get(url, params=params, timeout=30)
response.raise_for_status()
return response.json()
@task
def to_dataframe(raw_articles: list[list[dict[str, Any]]]) -> pd.DataFrame:
"""Flatten & normalise JSON into a tidy DataFrame."""
# Combine pages, then select fields we care about
records = [article for page in raw_articles for article in page]
df = pd.json_normalize(records)[
[
"id",
"title",
"published_at",
"url",
"comments_count",
"positive_reactions_count",
"tag_list",
"user.username",
]
]
return df
@task
def save_csv(df: pd.DataFrame, path: Path) -> None:
"""Persist DataFrame to disk then log a preview."""
df.to_csv(path, index=False)
print(f"Saved {len(df)} rows ➜ {path}\n\nPreview:\n{df.head()}\n")
@flow(name="devto_etl", log_prints=True)
def etl(api_base: str, pages: int, per_page: int, output_file: Path) -> None:
"""Run the end-to-end ETL for *pages* of articles."""
# Extract – simple loop for clarity
raw_pages: list[list[dict[str, Any]]] = []
for page_number in range(1, pages + 1):
raw_pages.append(fetch_page(page_number, api_base, per_page))
# Transform
df = to_dataframe(raw_pages)
# Load
save_csv(df, output_file)
python 01_getting_started/03_run_api_sourced_etl.py
if __name__ == "__main__":
# Configuration – tweak to taste
api_base = "https://dev.to/api"
pages = 3 # Number of pages to fetch
per_page = 30 # Articles per page (max 30 per API docs)
output_file = Path("devto_articles.csv")
etl(api_base=api_base, pages=pages, per_page=per_page, output_file=output_file)
fetch_page, to_dataframe, save_csv).fetch_page call downloaded a page and, if it failed, would automatically retry.log_prints=True flag logs messages inside the flow body; prints inside tasks are displayed in the console).save_csv for a database loader or S3 upload with one small change.etl flow and run it with different parameters from another flow.Prefect lets you focus on data, not orchestration plumbing – happy ETL-ing! 🎉