Back to Iceberg

Drop and create a table in HDFS

examples/Convert table to Iceberg.ipynb

latest3.1 KB
Original Source

Setup

  1. Create a Spark session
  2. Add the iceberg-runtime Jar
scala
spark
scala
%AddJar file:///home/user/iceberg-runtime-0.1.3.jar

Drop and create a table in HDFS

Spark Schema Helpers

scala
import org.apache.hadoop.fs.Path
import org.apache.iceberg.hadoop.HadoopTables
import org.apache.iceberg.spark.SparkSchemaUtil

val path = "hdfs:/tmp/tables/job_metrics_tmp"

{ // use a block to avoid values (conf, etc.) getting caught in closures

    // remove the temp table if it already exists
    val conf = spark.sessionState.newHadoopConf()
    val fs = new Path(path).getFileSystem(conf)
    fs.delete(new Path(path), true /* recursive */ )

    // create the temp table using Spark utils to create a schema and partition spec
    val tables = new HadoopTables(conf)
    val schema = SparkSchemaUtil.schemaForTable(spark, "default.job_metrics")
    val spec = SparkSchemaUtil.specForTable(spark, "default.job_metrics")

    tables.create(schema, spec, path)

    // show the schema
    tables.load(path).schema
}

Load table partitions as a DataFrame

scala
import org.apache.iceberg.spark.SparkTableUtil

// get a data frame with the table's partitions
val partitions = SparkTableUtil.partitionDF(spark, "default.job_metrics")
                               .filter($"format".contains("parquet") || $"format".contains("avro"))

display(partitions.limit(10))

List files, load metrics, and append to the table

scala
import org.apache.iceberg.hadoop.HadoopTables
import org.apache.hadoop.conf.Configuration

partitions.repartition(100).flatMap { row =>

    // list the partition and read Parquet footers to get metrics
    SparkTableUtil.listPartition(row.getMap[String, String](0).toMap, row.getString(1), row.getString(2))

}.repartition(10) // avoid lots of manifests that would be merged later
 .mapPartitions { files =>

    // open the table and append the files from this partition
    val tables = new HadoopTables(new Configuration())
    val table = tables.load("hdfs:/tmp/tables/job_metrics_tmp")

    // fast appends will create a manifest for the new files
    val append = table.newFastAppend

    files.foreach { file =>
        append.appendFile(file.toDataFile(table.spec))
    }

    // commit the new files
    append.commit()

    Seq.empty[String].iterator

}.count

Inspect the results

Snapshot API

scala
val tables = new HadoopTables(spark.sessionState.newHadoopConf())
val table = tables.load(path)

table.currentSnapshot
scala
import scala.collection.JavaConverters._

table.currentSnapshot.addedFiles.asScala.size
scala
table.newAppend.commit // use a merge commit to create a single manifest
scala
table.currentSnapshot