docs/how/load-indices.md
LoadIndices is a high-performance upgrade task designed for bulk loading metadata aspects directly from the database into Elasticsearch/OpenSearch indices. Unlike RestoreIndices which focuses on correctness and consistency, LoadIndices is optimized for speed and throughput during initial deployments or large-scale data migrations.
LoadIndices bypasses the standard event-driven processing pipeline to directly stream data from the metadata_aspect_v2 table into search indices using optimized bulk operations. This approach provides significant performance improvements for large installations while making specific architectural trade-offs that prioritize speed over consistency.
šØ CRITICAL WARNING: LoadIndices is designed for specific use cases only and should NEVER be used in production environments with active concurrent writes, MCL-dependent systems, or real-time consistency requirements. See Performance Trade-offs & Implications for complete details.
ā ļø Critical Understanding: LoadIndices prioritizes performance over consistency by making several architectural trade-offs. Understanding these implications is crucial before using LoadIndices in production environments.
Database ā LoadIndices ā Elasticsearch vs normal flow of Database ā Kafka MCL ā Multiple Consumers ā Elasticsearch/Graph/etcMetadata Change ā MCL Event ā Kafka ā Multiple Consumers ā Various StoresMetadata Change ā LoadIndices ā Direct ES Write (Skips Kafka entirely)TxIsolation.READ_UNCOMMITTED for faster database scanningā DO NOT use LoadIndices if you have:
ā Safe to use LoadIndices when:
Before using LoadIndices in any environment:
Verify Minimal Infrastructure:
metadata_aspect_v2 table accessible (via Ebean ORM)Stop All Ingestion (if DataHub is running):
# Disable all Kafka consumers
kubectl scale deployment --replicas=0 datahub-mae-consumer
kubectl scale deployment --replicas=0 datahub-mce-consumer
kubectl scale deployment --replicas=0 datahub-gms
Check Database Configuration:
# Check if using Cassandra (LoadIndices NOT supported)
grep -i cassandra /path/to/datahub/docker/docker-compose.yml
# Verify MySQL/PostgreSQL database is configured
grep -E "mysql\|postgres" /path/to/datahub/docker/docker-compose.yml
# ā ļø If Cassandra detected, LoadIndices is NOT available
# Must use RestoreIndices instead
Check Graph Storage Configuration:
# Check if using Neo4j (graph updates will be MISSING)
grep -r "neo4j" /path/to/datahub/docker/docker-compose.yml
# Check DataHub configuration for graph service selection
grep -i "graph.*elasticsearch\|neo4j" /path/to/datahub/conf/application.yml
# ā ļø If Neo4j is detected, LoadIndices will NOT update graph
Verify No Concurrent Writes:
# Check for active Elasticsearch indexing
curl -s "localhost:9200/_nodes/stats" | grep "index_current"
# Should show "index_current": 0
Index Clean State:
# Ensure clean indexing state
curl -s "localhost:9200/_nodes/stats" | grep -E "refresh.*active"
Coordinate with Operations:
| Level | LoadIndices | RestoreIndices |
|---|---|---|
| URN-level Ordering | ā Guaranteed | ā Guaranteed |
| Real-time Searchability | ā Delayed | ā Immediate |
| Graph Service Updates (ES-based) | ā Updated | ā Updated |
| Graph Service Updates (Neo4j-based) | ā Missing | ā Updated |
| MCL Event Propagation | ā Bypassed | ā Full propagation |
| Concurrent Write Safety | ā Not safe | ā Safe |
# If issues arise, prepare rollback:
# 1. Stop LoadIndices immediately
# 2. Restore from backup indices
# 3. Re-run with RestoreIndices for correctness
LoadIndices operates as an upgrade task that can run independently without requiring DataHub services to be running. It consists of two main steps:
Key Advantage: LoadIndices only requires:
This enables offline bulk operations during maintenance windows or initial deployments where DataHub infrastructure is being set up incrementally.
Index Creation: The BuildIndicesStep automatically creates all required Elasticsearch indices based on IndexConvention patterns, so empty Elasticsearch clusters are fully supported.
graph TD
A[LoadIndices Upgrade] --> B[BuildIndicesStep]
B --> C[Create/Configure Indices]
C --> D[LoadIndicesStep]
D --> E[Disable Refresh Intervals]
E --> F[Stream Aspects from DB]
F --> G[Batch Processing]
G --> H[Convert to MCL Events]
H --> I[Bulk Write to ES]
I --> J[Restore Refresh Intervals]
| Aspect | RestoreIndices | LoadIndices |
|---|---|---|
| Purpose | Correctness & consistency | Speed & throughput |
| Processing | Event-driven via MCL events | Direct bulk operations |
| Isolation | READ_COMMITTED | READ_UNCOMMITTED |
| Refresh Management | Static configuration | Dynamic disable/restore |
| Performance Focus | Accurate replay | Maximal speed |
| Use Case | Recovery from inconsistencies | Initial loads & migrations |
Key Benefit: LoadIndices can run with minimal infrastructure without requiring DataHub services to be running:
# Minimal requirements
ā
MySQL/PostgreSQL database (with metadata_aspect_v2 table)
ā
Elasticsearch/OpenSearch cluster
ā DataHub GMS/Maui services - NOT needed
ā Kafka cluster - NOT needed
ā Frontend services - NOT needed
LoadIndices can be executed via:
# From datahub-upgrade directory
./gradlew runLoadIndices
# With custom thread count
./gradlew runLoadIndices -PesThreadCount=6
IDE Execution: Run UpgradeTask.main() with LoadIndices arguments
Standalone JAR: Build and run datahub-upgrade JAR independently
| Argument | Description | Default | Example |
|---|---|---|---|
batchSize | Number of aspects per batch for processing | 10000 | -a batchSize=5000 |
limit | Maximum number of aspects to process | Integer.MAX_VALUE (no limit) | -a limit=50000 |
| Argument | Description | Example |
|---|---|---|
gePitEpochMs | Only process aspects created after this timestamp (milliseconds) | -a gePitEpochMs=1609459200000 |
lePitEpochMs | Only process aspects created before this timestamp (milliseconds) | -a lePitEpochMs=1640995200000 |
| Argument | Description | Example |
|---|---|---|
urnLike | SQL LIKE pattern to filter URNs | -a urnLike=urn:li:dataset:% |
aspectNames | Comma-separated list of aspect names to process | -a aspectNames=ownership,schemaMetadata |
lastUrn | Resume processing from this URN (inclusive) | -a lastUrn=urn:li:dataset:my-dataset |
| Environment Variable | Description | Default | Example |
|---|---|---|---|
ELASTICSEARCH_THREAD_COUNT | Number of I/O threads for BulkProcessor | 2 (app config), 4 (Gradle task) | ELASTICSEARCH_THREAD_COUNT=4 |
ES_BULK_ASYNC | Enable asynchronous bulk operations | true | ES_BULK_ASYNC=true |
ES_BULK_REQUESTS_LIMIT | Maximum bulk requests per buffer | 10000 | ES_BULK_REQUESTS_LIMIT=15000 |
ES_BULK_FLUSH_PERIOD | Bulk flush interval in seconds | 300 (5 minutes) | ES_BULK_FLUSH_PERIOD=300 |
If you're using Docker Compose with the DataHub source repository:
# Basic LoadIndices execution
./docker/datahub-upgrade/datahub-upgrade.sh -u LoadIndices
# LoadIndices with performance tuning
./docker/datahub-upgrade/datahub-upgrade.sh -u LoadIndices \
-a batchSize=15000 \
-a limit=100000
For development and testing environments:
# Run LoadIndices with default settings
./gradlew :datahub-upgrade:runLoadIndices
# Run with custom thread count and batch size
./gradlew :datahub-upgrade:runLoadIndices \
-PesThreadCount=4 \
-PbatchSize=15000 \
-Plimit=50000
The Gradle task supports these parameters:
esThreadCount: Set ELASTICSEARCH_THREAD_COUNT (default: 4)batchSize: Override batch size (default: 10000)limit: Set processing limiturnLike: Filter by URN patternaspectNames: Filter by aspect nameslePitEpochMs: Process records created before this timestampgePitEpochMs: Process records created after this timestamplastUrn: Resume processing from this URN (inclusive)Configure LoadIndices through Docker environment:
# Target specific entity types
docker run --rm datahub-upgrade \
-u LoadIndices \
-a urnLike=urn:li:dataset:% \
-a batchSize=20000
# Process specific aspects only
docker run --rm datahub-upgrade \
-u LoadIndices \
-a aspectNames=ownership,status,schemaMetadata \
-a batchSize=15000
# Time-based filtering
docker run --rm datahub-upgrade \
-u LoadIndices \
-a gePitEpochMs=1640995200000 \
-a limit=50000
# Resume from a specific URN
docker run --rm datahub-upgrade \
-u LoadIndices \
-a lastUrn=urn:li:dataset:my-dataset \
-a batchSize=10000
LoadIndices supports resuming from a specific URN when processing is interrupted:
When LoadIndices runs, it logs the last URN processed in each batch:
Batch completed - Last URN processed: urn:li:dataset:my-dataset
Processed 10000 aspects - 150.2 aspects/sec - Last URN: urn:li:dataset:my-dataset
To resume from where you left off:
# Resume from the last URN that was successfully processed
./gradlew :datahub-upgrade:runLoadIndices \
-a lastUrn=urn:li:dataset:my-dataset \
-a batchSize=10000
lastUrn parameter processes from the specified URN onwards (inclusive)# 1. Start initial processing
./gradlew :datahub-upgrade:runLoadIndices -a batchSize=5000
# 2. If interrupted, check logs for last URN:
# "Batch completed - Last URN processed: urn:li:dataset:my-dataset"
# 3. Resume from that URN
./gradlew :datahub-upgrade:runLoadIndices \
-a lastUrn=urn:li:dataset:my-dataset \
-a batchSize=5000
# Optimize bulk settings for LoadIndices
export ES_BULK_REQUESTS_LIMIT=15000
export ES_BULK_FLUSH_PERIOD=10
export ES_BULK_ASYNC=true
export ELASTICSEARCH_THREAD_COUNT=4
LoadIndices automatically configures connection pooling based on thread count:
# datahub-upgrade/build.gradle configuration
environment "ELASTICSEARCH_THREAD_COUNT", "4" # Auto-adjusts maxConnectionsPerRoute
Understanding when to use LoadIndices vs RestoreIndices is crucial for optimal performance and data consistency.
| Aspect | RestoreIndices | LoadIndices |
|---|---|---|
| Primary Purpose | Data consistency & correctness | Speed & throughput |
| Design Philosophy | Event-driven precision | Performance optimization |
| Consistency Model | Full consistency guarantee | Speed-optimized trade-offs |
| Use Case | Production recovery | Bulk migrations & initial loads |
| Feature | RestoreIndices | LoadIndices |
|---|---|---|
| Database Isolation | READ_COMMITTED | READ_UNCOMMITTED |
| MCL Events | ā Full MCL pipeline | ā Bypasses MCL entirely |
| Graph Updates (Elasticsearch) | ā Updated | ā Updated |
| Graph Updates (Neo4j) | ā Updated | ā Missing |
| Database Support | MySQL, PostgreSQL, Cassandra | MySQL, PostgreSQL only |
| Performance | Slower, safer | Faster, optimized |
| Real-time Consistency | ā Immediate | ā Delayed until refresh |
| Concurrency Safety | ā Safe | ā Not safe |