docs/content/stable/integrations/apache-spark/java-ysql.md
The following tutorial describes how to use Apache Spark in a Java program with YugabyteDB to read and write data using the YugabyteDB JDBC driver.
This tutorial assumes that you have:
brew install AdoptOpenJDK/openjdk/adoptopenjdk8.Create the database and table you will read and write to as follows:
From your YugabyteDB installation directory, use ysqlsh to read and write directly to the database as follows:
./bin/ysqlsh
Create a database ysql_spark and connect to it using the following:
yugabyte=# CREATE DATABASE ysql_spark;
yugabyte=# \c ysql_spark;
You are now connected to database "ysql_spark" as user "yugabyte".
ysql_spark=#
Create a table in the ysql_spark database to read and write data through the JDBC connector as follows:
ysql_spark=# CREATE TABLE test AS SELECT generate_series(1,100000) AS id, random(), ceil(random() * 20);
Create a project using the following maven command:
$ mvn archetype:generate -DgroupId=com.yugabyte -DartifactId=sparkSample -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
$ cd sparkSample
Open the pom.xml file in a text editor and add the following dependencies for Apache Spark and the YugabyteDB JDBC driver.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.4</version>
</dependency>
<dependency>
<groupId> com.yugabyte</groupId>
<artifactId>jdbc-yugabytedb</artifactId>
<version>{{< version-driver-java >}}</version>
</dependency>
<dependency>
<groupId>com.yugabyte</groupId>
<artifactId>spark-yugabytedb-dialect_2.12</artifactId>
<version>3.5.4-yb-1</version>
</dependency>
You must add the spark-yugabytedb-dialect_2.12 dependency to use the YugabyteDB JDBC Smart driver in your Spark application. This dialect is based on the PostgreSQL dialect and differs only in the URL prefix it handles: jdbc:yugabytedb: instead of jdbc:postgresql:. (You don't need this dialect dependency to use the PostgreSQL driver in your Spark application.) For more information, refer to the YugabyteDB Dialect for Apache Spark project.
Install the added dependencies.
$ mvn install
Create a java file SparkSQLJavaExample.java under src/main/java directory of your sparkSample project and add the following code to the file:
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.util.Properties;
public class SparkSQLJavaExample {
public static void main(String[] args) {
// Register the YugabyteDB Dialect. Needed for using the YugabyteDB JDBC Smart driver
YugabyteDBDialect.register();
//Create the spark session to work with spark
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.master", "local")
.getOrCreate();
//Connection URL
String jdbcUrl = "jdbc:yugabytedb://localhost:5433/ysql_spark";
Properties connectionProperties = new Properties();
connectionProperties.put("user", "yugabyte");
connectionProperties.put("password", "yugabyte");
//Create the DataFrame to read the data from the database table test
Dataset<Row> df_test = spark.read()
.jdbc(jdbcUrl, "public.test", connectionProperties);
//Print the schema of the DataFrame
df_test.printSchema();
/*
The output will be similar to the following:
root
|-- id: integer (nullable = true)
|-- random: double (nullable = true)
|-- ceil: double (nullable = true)
*/
//Read some data through the DataFrame APIs
df_test.select("id","ceil").groupBy("ceil").sum("id").limit(5).show();
/*
The output will be similar to the following:
+----+---------+
|ceil| sum(id)|
+----+---------+
| 8.0|249126014|
| 7.0|252286019|
|18.0|240967395|
| 1.0|249119602|
| 4.0|247163696|
+----+---------+
*/
/*
Renaming the column of the table "test", from "ceil" to "round_off"
in the dataframe and create a new table with the schema of the
changed dataframe and insert all its data in the new table, name
it as test_copy though the jdbc connector.
*/
df_test.createOrReplaceTempView("test");
spark.table("test")
.withColumnRenamed("ceil", "round_off")
.write()
.jdbc(jdbcUrl, "test_copy", connectionProperties);
//Create the DataFrame to read data from the database table test_copy
Dataset<Row> df_test_copy = spark.read()
.jdbc(jdbcUrl, "public.test_copy", connectionProperties);
//Print the schema of the DataFrame
df_test_copy.printSchema();
/*
The output will be similar to the following:
root
|-- id: integer (nullable = true)
|-- random: double (nullable = true)
|-- round_off: double (nullable = true)
*/
/*
The following code will create the DataFrame for the table "test" with some
specific options for maintaining the parallelism while fetching the table content,
1. numPartitions - divides the whole task into numPartitions parallel tasks.
2. lowerBound - min value of the partitionColumn in table
3. upperBound - max value of the partitionColumn in table
4. partitionColumn - the column on the basis of which partition happen
These options help in breaking down the whole task into `numPartitions` parallel
tasks on the basis of the `partitionColumn`, with the help of minimum and maximum
value of the column.
5. pushDownPredicate - optimizes the query by pushing down the filters to
YugabyteDB using the JDBC connector.
6. pushDownAggregate - optimizes the query by pushing down the aggregated
to YugabyteDB using the JDBC connector.
These two options help in optimizing the SQL queries executing on this DataFrame
if those SQL queries consist of some filters or aggregate functions by pushing
down those filters and aggregates to the YugabyteDB using the JDBC connector.
*/
Dataset<Row> new_df_test = spark.read()
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "test")
.option("user", "yugabyte")
.option("password", "yugabyte")
.option("driver", "com.yugabyte.Driver")
.option("load-balance", "true")
.option("numPartitions", 5)
.option("partitionColumn", "ceil")
.option("lowerBound", 0)
.option("upperBound", 20)
.option("pushDownPredicate", true)
.option("pushDownAggregate", true)
.load();
new_df_test.createOrReplaceTempView("test");
spark.sql("select sum(ceil) from test where id > 50000").show();
/*
The output will be similar to the following:
+---------+
|sum(ceil)|
+---------+
| 525124.0|
+---------+
*/
spark.stop();
}
}
Compile the project:
$ mvn compile
Run the application using the following command and verify your output as mentioned in the comments of the SparkSQLJavaExample file:
$ mvn exec:java -Dexec.mainClass="SparkSQLJavaExample"