metadata-ingestion/src/datahub/ingestion/recording/README.md
Beta Feature: Recording and replay is currently in beta. The feature is stable for debugging purposes but the archive format may change in future releases.
Debug ingestion issues by capturing all external I/O (HTTP requests and database queries) during ingestion runs, then replaying them locally in an air-gapped environment with full debugger support.
The recording system captures:
Recordings are stored in encrypted, compressed archives that can be replayed offline to reproduce issues exactly as they occurred in production.
The recorded and replayed MCPs are semantically identical - they contain the same source data. However, certain metadata fields will differ because they reflect when MCPs are emitted, not the source data itself:
systemMetadata.lastObserved - timestamp of MCP emissionsystemMetadata.runId - unique run identifierauditStamp.time - audit timestampUse datahub check metadata-diff to compare recordings semantically:
# Compare MCPs ignoring system metadata
datahub check metadata-diff \
--ignore-path "root['*']['systemMetadata']['lastObserved']" \
--ignore-path "root['*']['systemMetadata']['runId']" \
recording_output.json replay_output.json
A successful replay will show PERFECT SEMANTIC MATCH when ignoring these fields.
Install the optional debug-recording plugin:
pip install 'acryl-datahub[debug-recording]'
# Or with your source connectors
pip install 'acryl-datahub[looker,debug-recording]'
Dependencies:
vcrpy>=8.0.0 (Python 3.10+) or vcrpy>=7.0.0,<8.0.0 (Python 3.9) - HTTP recording/replaypyzipper>=0.3.6 - AES-256 encrypted archivesNote: The recording module uses lazy imports to avoid requiring optional dependencies (like sqlalchemy) when recording is not used. This means you can install the recording plugin without pulling in database connector dependencies unless you actually use them.
# Record with password protection
datahub ingest run -c recipe.yaml --record --record-password mysecret
# Record without S3 upload (for local testing)
datahub ingest run -c recipe.yaml --record --record-password mysecret --no-s3-upload
The recording creates an encrypted ZIP archive containing:
# Replay in air-gapped mode (default) - no network required
datahub ingest replay recording.zip --password mysecret
# Replay with live sink - replay source data, emit to real DataHub
datahub ingest replay recording.zip --password mysecret \
--live-sink --server http://localhost:8080
# View archive metadata
datahub recording info recording.zip --password mysecret
# Extract archive contents
datahub recording extract recording.zip --password mysecret --output-dir ./extracted
# List contents of a recording archive
datahub recording list recording.zip --password mysecret
source:
type: looker
config:
# ... source config ...
# Optional recording configuration
recording:
enabled: true
password: ${DATAHUB_RECORDING_PASSWORD} # Or use --record-password CLI flag
s3_upload: true # Upload directly to S3 (default: false)
output_path: s3://my-bucket/recordings/ # Required when s3_upload=true
When s3_upload is disabled (default), the recording is saved locally:
output_path if specifiedINGESTION_ARTIFACT_DIR directory if set| Variable | Description |
|---|---|
DATAHUB_RECORDING_PASSWORD | Default password for recording encryption |
ADMIN_PASSWORD | Fallback password (used in managed environments) |
INGESTION_ARTIFACT_DIR | Directory to save recordings when S3 upload is disabled. If not set, recordings are saved to temp directory. |
Recording:
datahub ingest run -c recipe.yaml \
--record # Enable recording
--record-password <pwd> # Encryption password
--record-output-path <path> # Override output path (for debugging)
--no-s3-upload # Disable S3 upload
--no-secret-redaction # Keep real credentials (for local debugging)
# Or save to specific directory
export INGESTION_ARTIFACT_DIR=/path/to/recordings
datahub ingest run -c recipe.yaml --record --record-password <pwd> --no-s3-upload
# Recording saved as: /path/to/recordings/recording-{run_id}.zip
Replay:
datahub ingest replay <archive> \
--password <pwd> # Decryption password
--live-sink # Enable real GMS sink
--server <url> # GMS server for live sink
--token <token> # Auth token for live sink
recording-{run_id}.zip (AES-256 encrypted, LZMA compressed)
├── manifest.json # Metadata, versions, checksums
├── recipe.yaml # Recipe with redacted secrets
├── http/
│ └── cassette.yaml # VCR HTTP recordings (YAML for binary data support)
└── db/
└── queries.jsonl # Database query recordings
{
"format_version": "1.0.0",
"run_id": "looker-2024-12-03-10_30_00-abc123",
"source_type": "looker",
"sink_type": "datahub-rest",
"datahub_cli_version": "0.14.0",
"python_version": "3.10.15",
"created_at": "2024-12-03T10:35:00Z",
"recording_start_time": "2024-12-03T10:30:00Z",
"files": ["http/cassette.yaml", "db/queries.jsonl"],
"checksums": { "http/cassette.yaml": "sha256:..." },
"has_exception": false,
"exception_info": null
}
source_type: The type of source connector (e.g., snowflake, looker, bigquery)sink_type: The type of sink (e.g., datahub-rest, file)datahub_cli_version: The DataHub CLI version used for recordingpython_version: The Python version used for recording (e.g., "3.10.15")recording_start_time: When recording began (informational)has_exception: Whether the recording captured an exceptionexception_info: Stack trace and details if an exception occurredStore the recording password in a secure location (secrets manager, environment variable) and use the same password across your team:
export DATAHUB_RECORDING_PASSWORD=$(vault read -field=password secret/datahub/recording)
datahub ingest run -c recipe.yaml --record
For best debugging results, record in an environment that matches production:
The archive filename includes the run_id. Use meaningful recipe names for easy identification:
# Recipe: snowflake-prod-daily.yaml
# Archive: snowflake-prod-daily-2024-12-03-10_30_00-abc123.zip
After recording, test the replay to ensure the recording is complete:
# Record (save MCP output for comparison)
datahub ingest run -c recipe.yaml --record --record-password test --no-s3-upload \
| tee recording_output.json
# Immediately test replay (save output)
datahub ingest replay /tmp/recording.zip --password test \
| tee replay_output.json
# Verify semantic equivalence
datahub check metadata-diff \
--ignore-path "root['*']['systemMetadata']['lastObserved']" \
--ignore-path "root['*']['systemMetadata']['runId']" \
recording_output.json replay_output.json
If recording captures an exception, the archive includes exception details:
datahub recording info recording.zip --password mysecret
# Output includes: has_exception: true, exception_info: {...}
For faster recordings and smaller archives, limit the scope:
source:
type: looker
config:
dashboard_pattern:
allow:
- "^specific-dashboard-id$"
To capture all HTTP requests reliably, recording serializes HTTP calls. This has performance implications:
| Scenario | Without Recording | With Recording |
|---|---|---|
| Parallel API calls | ~10s | ~90s |
| Single-threaded | ~90s | ~90s |
Mitigation: Recording is intended for debugging, not production. Use --no-s3-upload for faster local testing.
MCP metadata timestamps will always differ between recording and replay:
systemMetadata.lastObserved - set when MCP is emittedsystemMetadata.runId - unique per runauditStamp.time - set during processingMitigation: The actual source data is identical. Use datahub check metadata-diff with --ignore-path to verify semantic equivalence (see "Comparing Recording and Replay Output" above).
Some sources have non-deterministic behavior:
Mitigation: The replay serves recorded API responses, so data is identical. The system includes custom VCR matchers that handle non-deterministic request ordering (e.g., Looker usage queries with varying filter orders).
Database replay mocks the connection entirely - authentication is bypassed. This means:
Mitigation: For complex database debugging, use database-specific profiling tools alongside recording.
Recordings can be large for high-volume sources:
Mitigation:
Secrets are redacted in the stored recipe using __REPLAY_DUMMY__ markers. During replay:
Mitigation: The replay system auto-injects valid dummy values that pass common validators.
Sources using non-HTTP protocols cannot be fully recorded:
Mitigation: Most sources use HTTP REST APIs which are fully supported.
Some database connectors use non-standard HTTP implementations:
snowflake.connector.vendored.urllib3 and vendored.requestsImpact: HTTP authentication calls are NOT recorded during connection setup.
Why recording still works:
connect()What IS recorded:
cursor.execute()What is NOT recorded:
Automatic error handling: The recording system detects when VCR interferes with connection and automatically retries with VCR bypassed. You'll see warnings in logs but recording will succeed. SQL queries are captured normally regardless of HTTP recording status.
For debugging: SQL query recordings are sufficient for all metadata extraction scenarios.
Stateful ingestion checkpoints may behave differently during replay:
Mitigation: For stateful debugging, record a fresh run without existing state.
Large recordings are loaded into memory during replay:
Mitigation: For very large recordings, extract and inspect specific parts:
datahub recording extract recording.zip --password mysecret --output-dir ./extracted
# Manually inspect http/cassette.yaml
The recording module uses lazy imports to avoid requiring optional dependencies when recording is not used:
sqlalchemy is only imported when actually recording/replaying SQLAlchemy-based sourcesRecordingConfig, IngestionRecorder, and IngestionReplayer are imported on-demand via __getattr__Impact: This is transparent to users - the recording system works exactly the same, but with better dependency isolation. The debug-recording plugin is designed to be installed alongside source connectors, not as a standalone package. Dependencies like sqlalchemy are expected to be provided by the source connector itself when needed.
| Source | HTTP Recording | Notes |
|---|---|---|
| Looker | ✅ | Full support including SDK calls |
| PowerBI | ✅ | Full support |
| Tableau | ✅ | Full support |
| Superset | ✅ | Full support |
| Mode | ✅ | Full support |
| Sigma | ✅ | Full support |
| dbt Cloud | ✅ | Full support |
| Fivetran | ✅ | Full support |
| Source | HTTP Recording | DB Recording | Strategy | Notes |
|---|---|---|---|---|
| Snowflake | ❌ Not needed | ✅ Full | Connection wrapper | Native connector wrapped at connect() |
| Redshift | N/A | ✅ Full | Connection wrapper | Native connector wrapped at connect() |
| Databricks | ❌ Not needed | ✅ Full | Connection wrapper | Native connector wrapped at connect() |
| BigQuery | ✅ (REST API) | ✅ Full | Client wrapper | Client class wrapped |
| PostgreSQL | N/A | ✅ Full | Connection.execute() wrapper | SQLAlchemy connection.execute() wrapped |
| MySQL | N/A | ✅ Full | Connection.execute() wrapper | SQLAlchemy connection.execute() wrapped |
| SQLite | N/A | ✅ Full | Connection.execute() wrapper | SQLAlchemy connection.execute() wrapped |
| MSSQL | N/A | ✅ Full | Connection.execute() wrapper | SQLAlchemy connection.execute() wrapped |
Note: File staging operations (PUT/GET) are not used in metadata extraction and are therefore not a concern for recording/replay.
The recording system uses a hybrid approach that selects the best interception method for each database connector type:
1. Wrapper Strategy (Native Connectors)
connect() function or Client classconnect() functions that return connections we can wrapConnectionProxy wraps the real connection, CursorProxy intercepts queries2. Connection.execute() Wrapper Strategy (SQLAlchemy-based)
engine.connect() to intercept connections, then wraps connection.execute() to capture queries and resultsexecute() level, avoiding import reference issues with modules that import create_engine directlyengine.connect() to return connections with wrapped execute() methods that materialize and record resultscreate_engine directly (e.g., from sqlalchemy import create_engine), avoiding stale reference issuesWhy Different Strategies?
connect() functions that are easy to wrapconnection.execute() captures Result objects directly, avoiding issues with modules that import create_engine directlyBoth strategies achieve the same goal: intercepting SQL queries and results for recording/replay, but use the most appropriate method for each connector's architecture.
Database sources have a two-phase execution model:
Phase 1: Authentication (During connect())
Phase 2: SQL Execution (After connect())
CursorProxy (works for both wrapper strategies)queries.jsonlThis architecture makes recording resilient to HTTP library changes while maintaining perfect SQL replay fidelity. For Snowflake and Databricks, all metadata extraction happens via SQL queries in Phase 2, making HTTP recording unnecessary.
| Component | Recording | Notes |
|---|---|---|
| GMS REST API | ✅ | Sink emissions captured |
| GraphQL API | ✅ | If used by source |
| Stateful Backend | ✅ | Checkpoint calls captured |
Install the debug-recording plugin:
pip install 'acryl-datahub[debug-recording]'
The archive may be corrupted. Re-download or re-record:
datahub recording info recording.zip --password mysecret
# Check for checksum errors in output
The recorded cassette doesn't have a matching request. This can happen if:
has_exception in manifest)Solution: Re-record with the exact same configuration.
A small difference in event count (e.g., 3259 vs 3251) is normal due to:
Verification: Use datahub check metadata-diff to confirm semantic equivalence:
datahub check metadata-diff \
--ignore-path "root['*']['systemMetadata']['lastObserved']" \
--ignore-path "root['*']['systemMetadata']['runId']" \
recording_output.json replay_output.json
A "PERFECT SEMANTIC MATCH" confirms the replay is correct despite count differences.
HTTP requests are serialized during recording for reliability. To speed up:
--no-s3-upload for local testingLarge archives may timeout during upload:
# Record locally first
datahub ingest run -c recipe.yaml --record --record-password mysecret --no-s3-upload
# Upload manually with multipart
aws s3 cp recording.zip s3://bucket/recordings/ --expected-size $(stat -f%z recording.zip)
┌─────────────────────────────────────────────────────────────┐
│ IngestionRecorder │
│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐ │
│ │ HTTPRecorder │ │ ModulePatcher │ │ QueryRecorder│ │
│ │ (VCR.py) │ │ (DB proxies) │ │ (JSONL) │ │
│ └────────┬────────┘ └────────┬────────┘ └──────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────┐│
│ │ Encrypted Archive ││
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────────┐ ││
│ │ │manifest │ │ recipe │ │cassette │ │queries.jsonl│ ││
│ │ │.json │ │ .yaml │ │.yaml │ │ │ ││
│ │ └──────────┘ └──────────┘ └──────────┘ └────────────┘ ││
│ └─────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ IngestionReplayer │
│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐ │
│ │ HTTPReplayer │ │ ReplayPatcher │ │ QueryReplayer│ │
│ │ (VCR replay) │ │ (Mock conns) │ │ (Mock cursor│ │
│ └─────────────────┘ └─────────────────┘ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ Air-Gapped Replay │ │
│ │ - No network required │ │
│ │ - Full debugger support │ │
│ │ - Exact reproduction │ │
│ └──────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
When adding new source connectors:
patcher.py for their specific connector