Back to Datahub

Delta Lake Post

metadata-ingestion/docs/sources/delta-lake/delta-lake_post.md

1.6.08.5 KB
Original Source

Capabilities

Use the Important Capabilities table above as the source of truth for supported features and whether additional configuration is required.

Delta Table on Local File System

Step 1

Create a delta table using the sample PySpark code below if you don't have a delta table you can point to.

python
import uuid
import random
from pyspark.sql import SparkSession
from delta.tables import DeltaTable

def generate_data():
    return [(y, m, d, str(uuid.uuid4()), str(random.randrange(10000) % 26 + 65) * 3, random.random()*10000)
    for d in range(1, 29)
    for m in range(1, 13)
    for y in range(2000, 2021)]

jar_packages = ["org.apache.hadoop:hadoop-aws:3.2.3", "io.delta:delta-core_2.12:1.2.1"]
spark = SparkSession.builder \
    .appName("quickstart") \
    .master("local[*]") \
    .config("spark.jars.packages", ",".join(jar_packages)) \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

table_path = "quickstart/my-table"
columns = ["year", "month", "day", "sale_id", "customer", "total_cost"]
spark.sparkContext.parallelize(generate_data()).toDF(columns).repartition(1).write.format("delta").save(table_path)

df = spark.read.format("delta").load(table_path)
df.show()

Step 2

Create a datahub ingestion yaml file (delta.dhub.yaml) to ingest metadata from the delta table you just created.

yaml
source:
  type: "delta-lake"
  config:
    base_path: "quickstart/my-table"

sink:
  type: "datahub-rest"
  config:
    server: "http://localhost:8080"

Note: Make sure you run the Spark code as well as recipe from same folder otherwise use absolute paths.

Step 3

Execute the ingestion recipe:

shell
datahub ingest -c delta.dhub.yaml

Delta Table on S3

Step 1

Set up your AWS credentials by creating an AWS credentials config file; typically in '$HOME/.aws/credentials'.

[my-creds]
aws_access_key_id: ######
aws_secret_access_key: ######

Step 2: Create a Delta Table using the PySpark sample code below unless you already have Delta Tables on your S3.

python
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from configparser import ConfigParser
import uuid
import random
def generate_data():
    return [(y, m, d, str(uuid.uuid4()), str(random.randrange(10000) % 26 + 65) * 3, random.random()*10000)
    for d in range(1, 29)
    for m in range(1, 13)
    for y in range(2000, 2021)]

jar_packages = ["org.apache.hadoop:hadoop-aws:3.2.3", "io.delta:delta-core_2.12:1.2.1"]
spark = SparkSession.builder \
    .appName("quickstart") \
    .master("local[*]") \
    .config("spark.jars.packages", ",".join(jar_packages)) \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()


config_object = ConfigParser()
config_object.read("$HOME/.aws/credentials")
profile_info = config_object["my-creds"]
access_id = profile_info["aws_access_key_id"]
access_key = profile_info["aws_secret_access_key"]

hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", access_id)
hadoop_conf.set("fs.s3a.secret.key", access_key)

table_path = "s3a://my-bucket/my-folder/sales-table"
columns = ["year", "month", "day", "sale_id", "customer", "total_cost"]
spark.sparkContext.parallelize(generate_data()).toDF(columns).repartition(1).write.format("delta").save(table_path)
df = spark.read.format("delta").load(table_path)
df.show()

Step 3

Create a datahub ingestion yaml file (delta.s3.dhub.yaml) to ingest metadata from the delta table you just created.

yml
source:
  type: "delta-lake"
  config:
    base_path: "s3://my-bucket/my-folder/sales-table"
    s3:
      aws_config:
        aws_access_key_id: <<Access key>>
        aws_secret_access_key: <<secret key>>

sink:
  type: "datahub-rest"
  config:
    server: "http://localhost:8080"
Step 4

Execute the ingestion recipe:

shell
datahub ingest -c delta.s3.dhub.yaml
Required IAM Permissions

The IAM principal used by the source needs the following S3 actions:

ActionRequiredPurpose
s3:ListBucketAlwaysAuto-discovering table folders under base_path and listing entries under _delta_log/ to find transaction log versions.
s3:GetObjectAlwaysReading _delta_log/*.json and checkpoint parquet files to extract schema and table properties.
s3:GetBucketTaggingOnly if s3.use_s3_bucket_tags: trueFetching bucket tags to attach to the dataset.
s3:GetObjectTaggingOnly if s3.use_s3_object_tags: trueFetching object tags to attach to the dataset.

If the bucket is encrypted with a customer-managed KMS key, the principal also needs kms:Decrypt on that key.

A minimal policy scoped to a single bucket and prefix looks like this:

json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": ["s3:ListBucket"],
      "Resource": "arn:aws:s3:::your-bucket",
      "Condition": { "StringLike": { "s3:prefix": ["your/delta/prefix/*"] } }
    },
    {
      "Effect": "Allow",
      "Action": ["s3:GetObject"],
      "Resource": "arn:aws:s3:::your-bucket/your/delta/prefix/*"
    }
  ]
}

Delta Table on Azure Data Lake Storage (ADLS Gen2 / Blob)

You can ingest Delta tables stored in Azure using abfss://, abfs://, az://, adl://, or Azure HTTPS paths.

Azure folder discovery reuses shared Azure Blob helpers from the Azure ingestion module. As a result, recursive folder scanning requires static credentials in source.config.azure (account_key, sas_token, or client_id + client_secret + tenant_id).

Example using account key

yaml
source:
  type: "delta-lake"
  config:
    base_path: "abfss://[email protected]/delta/sales"
    azure:
      account_key: ${AZURE_STORAGE_ACCOUNT_KEY}

sink:
  type: "datahub-rest"
  config:
    server: "http://localhost:8080"
Example using service principal
yaml
source:
  type: "delta-lake"
  config:
    base_path: "az://my-container/delta/sales"
    azure:
      account_name: "myaccount"
      client_id: ${AZURE_CLIENT_ID}
      client_secret: ${AZURE_CLIENT_SECRET}
      tenant_id: ${AZURE_TENANT_ID}

sink:
  type: "datahub-rest"
  config:
    server: "http://localhost:8080"

If you use az:// or adl:// URIs, set azure.account_name explicitly.

If you use azure.credential (unified token-based auth), use base_path that points directly to a Delta table. Token-based credential is not used for recursive folder listing.

Assigning domains with a transformer

Delta Lake source does not provide connector-specific domain config. To assign domains, use the dataset transformer simple_add_dataset_domain.

yaml
source:
  type: "delta-lake"
  config:
    base_path: "abfss://[email protected]/delta/sales"

transformers:
  - type: "simple_add_dataset_domain"
    config:
      domains:
        - ${DATAHUB_DOMAIN_URN}

For additional options (PATCH, replace_existing, multiple domains), see Simple Add Dataset Domains.

Limitations

The above recipes are minimal recipes. Please refer to Config Details section for the full configuration. Module behavior is constrained by source APIs, permissions, and metadata exposed by the platform. Refer to capability notes for unsupported or conditional features.

Troubleshooting

If ingestion fails, validate credentials, permissions, connectivity, and scope filters first. Then review ingestion logs for source-specific errors and adjust configuration accordingly.