examples/Convert table to Iceberg.ipynb
spark
%AddJar file:///home/user/iceberg-runtime-0.1.3.jar
import org.apache.hadoop.fs.Path
import org.apache.iceberg.hadoop.HadoopTables
import org.apache.iceberg.spark.SparkSchemaUtil
val path = "hdfs:/tmp/tables/job_metrics_tmp"
{ // use a block to avoid values (conf, etc.) getting caught in closures
// remove the temp table if it already exists
val conf = spark.sessionState.newHadoopConf()
val fs = new Path(path).getFileSystem(conf)
fs.delete(new Path(path), true /* recursive */ )
// create the temp table using Spark utils to create a schema and partition spec
val tables = new HadoopTables(conf)
val schema = SparkSchemaUtil.schemaForTable(spark, "default.job_metrics")
val spec = SparkSchemaUtil.specForTable(spark, "default.job_metrics")
tables.create(schema, spec, path)
// show the schema
tables.load(path).schema
}
import org.apache.iceberg.spark.SparkTableUtil
// get a data frame with the table's partitions
val partitions = SparkTableUtil.partitionDF(spark, "default.job_metrics")
.filter($"format".contains("parquet") || $"format".contains("avro"))
display(partitions.limit(10))
import org.apache.iceberg.hadoop.HadoopTables
import org.apache.hadoop.conf.Configuration
partitions.repartition(100).flatMap { row =>
// list the partition and read Parquet footers to get metrics
SparkTableUtil.listPartition(row.getMap[String, String](0).toMap, row.getString(1), row.getString(2))
}.repartition(10) // avoid lots of manifests that would be merged later
.mapPartitions { files =>
// open the table and append the files from this partition
val tables = new HadoopTables(new Configuration())
val table = tables.load("hdfs:/tmp/tables/job_metrics_tmp")
// fast appends will create a manifest for the new files
val append = table.newFastAppend
files.foreach { file =>
append.appendFile(file.toDataFile(table.spec))
}
// commit the new files
append.commit()
Seq.empty[String].iterator
}.count
val tables = new HadoopTables(spark.sessionState.newHadoopConf())
val table = tables.load(path)
table.currentSnapshot
import scala.collection.JavaConverters._
table.currentSnapshot.addedFiles.asScala.size
table.newAppend.commit // use a merge commit to create a single manifest
table.currentSnapshot