metadata-ingestion/docs/sources/datahub-documents/datahub-documents_post.md
Use the Important Capabilities table above as the source of truth for supported features and whether additional configuration is required.
:::tip Quick Start: Auto-Deploy for Semantic Search
To enable automatic semantic search indexing for your documents, deploy this source to DataHub with a simple command:
# Create minimal recipe and deploy with hourly schedule
cat > /tmp/datahub-docs.yml << 'EOF'
source:
type: datahub-documents
config: {}
EOF
datahub ingest deploy -c /tmp/datahub-docs.yml --name "document-embeddings" --schedule "0 * * * *"
This creates a managed ingestion source in DataHub that automatically processes documents every hour and generates embeddings for semantic search.
What this does:
0 * * * *) to keep embeddings up-to-dateAlternative schedules:
# Every 15 minutes: "*/15 * * * *"
# Every 6 hours: "0 */6 * * *"
# Daily at 2 AM: "0 2 * * *"
Note: In future DataHub versions, GMS will run this automatically. For now, manual deployment is required.
:::
Process documents in real-time as they're created or updated:
source:
type: datahub-documents
config:
# Event mode enabled by default in recent versions
event_mode:
enabled: true
idle_timeout_seconds: 60
sink:
type: datahub-rest
config: {}
When to use:
Process all documents in a single run:
source:
type: datahub-documents
config:
event_mode:
enabled: false
# Optional: Process specific platforms
platform_filter: ["notion", "confluence"]
sink:
type: datahub-rest
config: {}
When to use:
Process documents from specific platforms only:
source:
type: datahub-documents
config:
# Process NATIVE documents + EXTERNAL from these platforms
platform_filter: ["notion", "confluence"]
incremental:
enabled: true
sink:
type: datahub-rest
config: {}
Reprocess all documents regardless of content changes:
source:
type: datahub-documents
config:
incremental:
enabled: true
force_reprocess: true # Reprocess everything
# Useful when:
# - Changing chunking strategy
# - Updating embedding model
# - Fixing processing issues
sink:
type: datahub-rest
config: {}
Override server configuration with local settings:
source:
type: datahub-documents
config:
# Custom chunking
chunking:
strategy: by_title # or 'basic'
max_characters: 1000 # Larger chunks
combine_text_under_n_chars: 200
# Override embedding config (validates against server)
embedding:
provider: bedrock
model: cohere.embed-english-v3
model_embedding_key: cohere_embed_v3
aws_region: us-west-2
batch_size: 50
sink:
type: datahub-rest
config: {}
⚠️ Warning: Custom embedding configs are validated against the server. Mismatches will cause errors.
1. Fetch Mode Selection
├─ Event Mode: Subscribe to Kafka MCL events
└─ Batch Mode: GraphQL query for all documents
2. For Each Document:
├─ Check incremental state (skip if unchanged)
├─ Partition markdown → structured elements
├─ Chunk elements → semantic chunks
│ ├─ by_title: Preserves document structure
│ └─ basic: Fixed-size chunks with overlap
├─ Generate embeddings via LiteLLM
│ └─ Batches of 25 (configurable)
└─ Emit SemanticContent aspect → DataHub
3. State Management
├─ Batch Mode: Track document content hashes
└─ Event Mode: Track Kafka offsets
First Run (No State):
Subsequent Runs:
Content Hash Calculation:
hash_input = {
"text": document.text,
"partition_strategy": config.partition_strategy,
"chunking_strategy": config.chunking.strategy,
"max_characters": config.chunking.max_characters,
# ... other chunking params
}
content_hash = sha256(json.dumps(hash_input))
When Documents Are Reprocessed:
force_reprocess: true is setThe platform_filter setting controls which documents are processed:
None (default):
platform_filter: null # or omit the field
Specific Platforms:
platform_filter: ["notion", "confluence"]
All Documents:
platform_filter: ["*"] # or ["ALL"]
event_mode:
enabled: true
# Consumer ID for offset tracking
consumer_id: "datahub-documents-{pipeline_name}" # Default
# Kafka topics to consume
topics:
- "MetadataChangeLog_Versioned_v1"
# Lookback window for first run
lookback_days: null # null = start from latest, or specify days
# Reset offsets to beginning (DANGEROUS - reprocesses everything)
reset_offsets: false
# Exit after N seconds with no new events
idle_timeout_seconds: 30
# Kafka poll settings
poll_timeout_seconds: 2
poll_limit: 100
by_title (Recommended):
chunking:
strategy: by_title
max_characters: 500
combine_text_under_n_chars: 100
basic:
chunking:
strategy: basic
max_characters: 500
overlap: 50 # Character overlap between chunks
Default (Fetch from Server):
embedding: {} # or omit entirely
Override (Validated Against Server):
embedding:
provider: bedrock # bedrock, cohere, openai
model: cohere.embed-english-v3
model_embedding_key: cohere_embed_v3 # Must match server!
aws_region: us-west-2
batch_size: 25
input_type: search_document # Cohere-specific
Break-Glass Override (NOT RECOMMENDED):
embedding:
allow_local_embedding_config: true
provider: bedrock
model: cohere.embed-english-v3
# ... other settings
stateful_ingestion:
enabled: true # Enabled by default
# State backend configuration
state_provider:
type: datahub # Store state in DataHub
config:
datahub_api:
server: "http://localhost:8080"
token: "${DATAHUB_TOKEN}"
# Ignore previous state (fresh start)
ignore_old_state: false
# Don't commit new state (dry run)
ignore_new_state: false
embedding:
batch_size: 25 # Default
# Increase for faster processing (if provider supports):
# - Cohere: Up to 96
# - Bedrock: Up to 100 (but rate-limited)
event_mode:
poll_limit: 100 # Fetch up to 100 events per poll
# Increase for high-volume scenarios:
poll_limit: 500 # Process more events per batch
# Skip short or empty documents
skip_empty_text: true
min_text_length: 50 # Characters
# Process fewer documents
platform_filter: ["notion"] # Only one platform
document_urns: # Specific documents only
- "urn:li:document:abc123"
The source reports the following metrics:
report = {
"num_documents_fetched": 100, # Total documents fetched
"num_documents_processed": 85, # Successfully processed
"num_documents_skipped": 15, # Skipped (various reasons)
"num_documents_skipped_unchanged": 10, # Unchanged content
"num_documents_skipped_empty": 5, # Empty or too short
"num_chunks_created": 425, # Total chunks generated
"num_embeddings_generated": 425, # Total embeddings
"processing_errors": [] # List of errors
}
Enable debug logging for detailed insights:
# In your ingestion recipe
source:
type: datahub-documents
config:
# ... your config
# Set log level via environment variable
# export DATAHUB_DEBUG=true
Look for these log messages:
"Loading embedding configuration from DataHub server...""✓ Loaded embedding configuration from server""Incremental mode enabled, state file: ...""Skipping document {urn} (unchanged content hash)"As of December 2024 in us-west-2:
Example Costs:
One-time Processing:
Incremental Updates (Event Mode):
Query Embeddings (GMS):
Document.text field (markdown format expected)unstructured.partition.md which may not handle all markdown variantssourceType field (defaults to NATIVE if missing)dataPlatformInstance or URL-based platform extractionModule behavior is constrained by source APIs, permissions, and metadata exposed by the platform. Refer to capability notes for unsupported or conditional features.
Cause: Server does not have semantic search configured.
Solution:
ELASTICSEARCH_SEMANTIC_SEARCH_ENABLED=true in server configCause: Old DataHub server version (pre-v0.14.0).
Solutions:
Option 1 (Recommended): Upgrade DataHub server to v0.14.0+
Option 2: Provide local embedding config:
embedding:
provider: bedrock
model: cohere.embed-english-v3
model_embedding_key: cohere_embed_v3
aws_region: us-west-2
Error:
Embedding configuration mismatch with server:
- Model: local='cohere.embed-english-v3', server='amazon.titan-embed-text-v1'
Cause: Local config doesn't match server configuration.
Solution:
Possible Causes:
Platform Filter Too Restrictive:
# If you have NATIVE documents but filter for external platforms:
platform_filter: ["notion"] # Won't process NATIVE documents!
# Solution: Remove filter or use null
platform_filter: null
All Documents Unchanged:
incremental.force_reprocess: trueDocuments Have No Text:
Document.text fieldmin_text_length thresholdSymptoms: Falls back to batch mode every run.
Possible Causes:
Stateful Ingestion Disabled:
stateful_ingestion:
enabled: true # Must be enabled for event mode
Kafka Connection Issues:
State Provider Misconfigured:
stateful_ingestion:
state_provider:
type: datahub
config:
datahub_api:
server: "http://correct-host:8080" # Correct URL
Error:
Unable to load credentials from any provider in the chain
Solutions:
Verify AWS_PROFILE:
export AWS_PROFILE=datahub-dev
cat ~/.aws/credentials # Check profile exists
For EC2 Instance Role:
# Check instance role is attached
curl http://169.254.169.254/latest/meta-data/iam/security-credentials/
For ECS Task Role:
Optimization Strategies:
Increase Batch Size:
embedding:
batch_size: 50 # Up from default 25
Use Event Mode:
Filter Documents:
platform_filter: ["notion"] # Process fewer platforms
min_text_length: 100 # Skip short documents
Optimize Chunking:
chunking:
max_characters: 1000 # Larger chunks = fewer embeddings
If ingestion fails, validate credentials, permissions, connectivity, and scope filters first. Then review ingestion logs for source-specific errors and adjust configuration accordingly.