metadata-ingestion/docs/sources/delta-lake/delta-lake_post.md
Use the Important Capabilities table above as the source of truth for supported features and whether additional configuration is required.
Create a delta table using the sample PySpark code below if you don't have a delta table you can point to.
import uuid
import random
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
def generate_data():
return [(y, m, d, str(uuid.uuid4()), str(random.randrange(10000) % 26 + 65) * 3, random.random()*10000)
for d in range(1, 29)
for m in range(1, 13)
for y in range(2000, 2021)]
jar_packages = ["org.apache.hadoop:hadoop-aws:3.2.3", "io.delta:delta-core_2.12:1.2.1"]
spark = SparkSession.builder \
.appName("quickstart") \
.master("local[*]") \
.config("spark.jars.packages", ",".join(jar_packages)) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
table_path = "quickstart/my-table"
columns = ["year", "month", "day", "sale_id", "customer", "total_cost"]
spark.sparkContext.parallelize(generate_data()).toDF(columns).repartition(1).write.format("delta").save(table_path)
df = spark.read.format("delta").load(table_path)
df.show()
Create a datahub ingestion yaml file (delta.dhub.yaml) to ingest metadata from the delta table you just created.
source:
type: "delta-lake"
config:
base_path: "quickstart/my-table"
sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"
Note: Make sure you run the Spark code as well as recipe from same folder otherwise use absolute paths.
Execute the ingestion recipe:
datahub ingest -c delta.dhub.yaml
Set up your AWS credentials by creating an AWS credentials config file; typically in '$HOME/.aws/credentials'.
[my-creds]
aws_access_key_id: ######
aws_secret_access_key: ######
Step 2: Create a Delta Table using the PySpark sample code below unless you already have Delta Tables on your S3.
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from configparser import ConfigParser
import uuid
import random
def generate_data():
return [(y, m, d, str(uuid.uuid4()), str(random.randrange(10000) % 26 + 65) * 3, random.random()*10000)
for d in range(1, 29)
for m in range(1, 13)
for y in range(2000, 2021)]
jar_packages = ["org.apache.hadoop:hadoop-aws:3.2.3", "io.delta:delta-core_2.12:1.2.1"]
spark = SparkSession.builder \
.appName("quickstart") \
.master("local[*]") \
.config("spark.jars.packages", ",".join(jar_packages)) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
config_object = ConfigParser()
config_object.read("$HOME/.aws/credentials")
profile_info = config_object["my-creds"]
access_id = profile_info["aws_access_key_id"]
access_key = profile_info["aws_secret_access_key"]
hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", access_id)
hadoop_conf.set("fs.s3a.secret.key", access_key)
table_path = "s3a://my-bucket/my-folder/sales-table"
columns = ["year", "month", "day", "sale_id", "customer", "total_cost"]
spark.sparkContext.parallelize(generate_data()).toDF(columns).repartition(1).write.format("delta").save(table_path)
df = spark.read.format("delta").load(table_path)
df.show()
Create a datahub ingestion yaml file (delta.s3.dhub.yaml) to ingest metadata from the delta table you just created.
source:
type: "delta-lake"
config:
base_path: "s3://my-bucket/my-folder/sales-table"
s3:
aws_config:
aws_access_key_id: <<Access key>>
aws_secret_access_key: <<secret key>>
sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"
Execute the ingestion recipe:
datahub ingest -c delta.s3.dhub.yaml
The IAM principal used by the source needs the following S3 actions:
| Action | Required | Purpose |
|---|---|---|
s3:ListBucket | Always | Auto-discovering table folders under base_path and listing entries under _delta_log/ to find transaction log versions. |
s3:GetObject | Always | Reading _delta_log/*.json and checkpoint parquet files to extract schema and table properties. |
s3:GetBucketTagging | Only if s3.use_s3_bucket_tags: true | Fetching bucket tags to attach to the dataset. |
s3:GetObjectTagging | Only if s3.use_s3_object_tags: true | Fetching object tags to attach to the dataset. |
If the bucket is encrypted with a customer-managed KMS key, the principal also needs kms:Decrypt on that key.
A minimal policy scoped to a single bucket and prefix looks like this:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:ListBucket"],
"Resource": "arn:aws:s3:::your-bucket",
"Condition": { "StringLike": { "s3:prefix": ["your/delta/prefix/*"] } }
},
{
"Effect": "Allow",
"Action": ["s3:GetObject"],
"Resource": "arn:aws:s3:::your-bucket/your/delta/prefix/*"
}
]
}
You can ingest Delta tables stored in Azure using abfss://, abfs://, az://, adl://, or Azure HTTPS paths.
Azure folder discovery reuses shared Azure Blob helpers from the Azure ingestion module.
As a result, recursive folder scanning requires static credentials in source.config.azure
(account_key, sas_token, or client_id + client_secret + tenant_id).
source:
type: "delta-lake"
config:
base_path: "abfss://[email protected]/delta/sales"
azure:
account_key: ${AZURE_STORAGE_ACCOUNT_KEY}
sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"
source:
type: "delta-lake"
config:
base_path: "az://my-container/delta/sales"
azure:
account_name: "myaccount"
client_id: ${AZURE_CLIENT_ID}
client_secret: ${AZURE_CLIENT_SECRET}
tenant_id: ${AZURE_TENANT_ID}
sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"
If you use az:// or adl:// URIs, set azure.account_name explicitly.
If you use azure.credential (unified token-based auth), use base_path that points directly
to a Delta table. Token-based credential is not used for recursive folder listing.
Delta Lake source does not provide connector-specific domain config.
To assign domains, use the dataset transformer simple_add_dataset_domain.
source:
type: "delta-lake"
config:
base_path: "abfss://[email protected]/delta/sales"
transformers:
- type: "simple_add_dataset_domain"
config:
domains:
- ${DATAHUB_DOMAIN_URN}
For additional options (PATCH, replace_existing, multiple domains), see
Simple Add Dataset Domains.
The above recipes are minimal recipes. Please refer to Config Details section for the full configuration. Module behavior is constrained by source APIs, permissions, and metadata exposed by the platform. Refer to capability notes for unsupported or conditional features.
If ingestion fails, validate credentials, permissions, connectivity, and scope filters first. Then review ingestion logs for source-specific errors and adjust configuration accordingly.