docs/v3/examples/atproto-dashboard-with-prefect-assets.mdx
{/*
This page is automatically generated via the generate_example_pages.py script. Any changes to this page will be overwritten.
*/}
<a href="https://github.com/zzstoatzz/atproto-dashboard" target="_blank">View full project on GitHub</a>
Build data pipelines with Prefect Assets – declarative, dependency-aware, and observable.
This example demonstrates how to use Prefect Assets to build a social media analytics pipeline. The full implementation with ATProto integration, dbt transformations, and a Streamlit dashboard dashboard is available at: https://github.com/zzstoatzz/atproto-dashboard
@materialize decorator – Transform functions into versioned, cacheable data assetsInstead of manually managing data dependencies and storage:
@materialize and unique keys (e.g., S3 paths)asset_depsThis simplified example demonstrates the core patterns. For the complete implementation:
git clone https://github.com/zzstoatzz/atproto-dashboard
cd atproto-dashboard
# Follow README for setup and configuration
Assets represent data products in your pipeline. Each asset has:
@materializeDefine assets with descriptive keys
import json
from datetime import datetime, timezone
from pathlib import Path
from textwrap import dedent
from typing import Any
from prefect import flow
from prefect.artifacts import create_markdown_artifact
from prefect.assets import Asset, materialize
raw_data_asset = Asset(key="pipeline://raw_data")
processed_data_asset = Asset(key="pipeline://processed_data")
analytics_asset = Asset(key="pipeline://analytics")
The first asset fetches data from an external source. In the full implementation, this connects to the ATProto/Bluesky API to fetch social media data.
@materialize(raw_data_asset)
def fetch_raw_data() -> dict[str, Any]:
"""Fetch raw data from an external source."""
print("Fetching raw data...")
data = {
"items": ["item1", "item2", "item3"],
"fetched_at": datetime.now(timezone.utc).isoformat(),
"count": 3,
}
print(f"✓ Fetched {data['count']} items")
return data
This asset demonstrates automatic dependency tracking. By accepting raw_data as a parameter,
Prefect knows this asset depends on raw_data_asset and ensures it's materialized first.
In production, this would store data to S3 with partitioning. Here we use local storage for simplicity.
@materialize(processed_data_asset)
def process_data(raw_data: dict[str, Any]) -> dict[str, Any]:
"""Process raw data into a structured format with automatic dependency tracking."""
print(f"Processing {raw_data['count']} items...")
processed = {
"items": [item.upper() for item in raw_data["items"]],
"processed_at": datetime.now(timezone.utc).isoformat(),
"source_count": raw_data["count"],
}
storage_dir = Path("./data")
storage_dir.mkdir(exist_ok=True)
with open(storage_dir / "processed.json", "w") as f:
json.dump(processed, f, indent=2)
print(f"✓ Processed and stored {len(processed['items'])} items")
return processed
This asset demonstrates chained dependencies (it depends on processed_data, which depends on raw_data)
and artifact creation for rich observability in the Prefect UI.
In the full implementation, this runs dbt transformations to create analytics models.
@materialize(analytics_asset)
def create_analytics(processed_data: dict[str, Any]) -> dict[str, Any]:
"""Generate analytics with chained dependencies and create UI artifacts."""
print("Creating analytics...")
analytics = {
"total_items": len(processed_data["items"]),
"source_timestamp": processed_data["processed_at"],
"created_at": datetime.now(timezone.utc).isoformat(),
}
create_markdown_artifact(
key="analytics-summary",
markdown=dedent(
f"""
# Analytics Summary
- **Total Items**: {analytics["total_items"]}
- **Created**: {analytics["created_at"]}
- **Source**: {analytics["source_timestamp"]}
This artifact appears in the Prefect UI for observability.
"""
),
description="Analytics summary for this pipeline run",
)
print(f"✓ Analytics created for {analytics['total_items']} items")
return analytics
The flow calls each asset function, and Prefect handles:
@flow(name="asset-pipeline-demo", log_prints=True)
def run_asset_pipeline() -> dict[str, Any]:
"""
Orchestrate the asset pipeline.
By calling the materialization functions in sequence and passing results,
Prefect automatically:
- Tracks dependencies between assets
- Ensures execution order
- Provides observability in the UI
- Enables caching and versioning
"""
print("🚀 Starting asset pipeline")
# Materialize assets - Prefect tracks dependencies automatically
raw = fetch_raw_data()
processed = process_data(raw)
analytics = create_analytics(processed)
print(f"✅ Pipeline complete! Processed {analytics['total_items']} items")
return analytics
if __name__ == "__main__":
run_asset_pipeline()
Automatic Dependency Tracking
Caching and Versioning
Storage Integration
prefect-aws, prefect-gcp, etc.Observability
Production Ready
This example demonstrates the core patterns. The complete implementation includes:
See the full project at: https://github.com/zzstoatzz/atproto-dashboard