pinot-connectors/pinot-spark-3-connector/README.md
Spark-pinot connector to read data from Pinot.
Detailed read model documentation is here; Spark-Pinot Connector Read Model
import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession
.builder()
.appName("spark-pinot-connector-test")
.master("local")
.getOrCreate()
import spark.implicits._
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.load()
.filter($"DestStateName" === "Florida")
data.show(100)
You can secure both HTTP and gRPC using a unified switch or explicit flags.
secureMode=true to enable HTTPS and gRPC TLS together (recommended)useHttps for REST and grpc.use-plain-text=false for gRPC// Unified secure mode (enables HTTPS + gRPC TLS by default)
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("secureMode", "true")
.load()
// Explicit HTTPS only (gRPC remains plaintext by default)
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("useHttps", "true")
.load()
// Explicit gRPC TLS only (REST remains HTTP by default)
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("grpc.use-plain-text", "false")
.load()
When HTTPS is enabled (either via secureMode=true or useHttps=true), you can configure keystore/truststore as needed:
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("useHttps", "true")
.option("keystorePath", "/path/to/keystore.jks")
.option("keystorePassword", "keystorePassword")
.option("truststorePath", "/path/to/truststore.jks")
.option("truststorePassword", "truststorePassword")
.load()
| Option | Description | Required | Default |
|---|---|---|---|
secureMode | Unified switch to enable HTTPS and gRPC TLS | No | false |
useHttps | Enable HTTPS connections (overrides secureMode for REST) | No | false |
keystorePath | Path to client keystore file (JKS format) | No | None |
keystorePassword | Password for the keystore | No | None |
truststorePath | Path to truststore file (JKS format) | No | None |
truststorePassword | Password for the truststore | No | None |
Note: If no truststore is provided when HTTPS is enabled, the connector will trust all certificates (not recommended for production use).
The connector supports custom authentication headers for secure access to Pinot clusters:
// Using Bearer token authentication
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("authToken", "my-jwt-token") // Automatically adds "Authorization: Bearer my-jwt-token"
.load()
// Using custom authentication header
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("authHeader", "Authorization")
.option("authToken", "Bearer my-custom-token")
.load()
// Using API key authentication
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("authHeader", "X-API-Key")
.option("authToken", "my-api-key")
.load()
| Option | Description | Required | Default |
|---|---|---|---|
authHeader | Custom authentication header name | No | Authorization (when authToken is provided) |
authToken | Authentication token/value | No | None |
Note: If only authToken is provided without authHeader, the connector will automatically use Authorization: Bearer <token>.
The connector supports Pinot Proxy for secure cluster access where the proxy is the only exposed endpoint. When proxy is enabled, all HTTP requests to controllers/brokers and gRPC requests to servers are routed through the proxy.
// Basic proxy configuration
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("controller", "pinot-proxy:8080") // Proxy endpoint
.option("proxy.enabled", "true")
.load()
// Proxy with authentication
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("controller", "pinot-proxy:8080")
.option("proxy.enabled", "true")
.option("authToken", "my-proxy-token")
.load()
// Proxy with gRPC configuration
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("controller", "pinot-proxy:8080")
.option("proxy.enabled", "true")
.option("grpc.proxy-uri", "pinot-proxy:8094") // gRPC proxy endpoint
.load()
| Option | Description | Required | Default |
|---|---|---|---|
proxy.enabled | Use Pinot Proxy for controller and broker requests | No | false |
Note: When proxy is enabled, the connector adds FORWARD_HOST and FORWARD_PORT headers to route requests to the actual Pinot services.
The connector supports comprehensive gRPC configuration for secure and optimized communication with Pinot servers.
// Basic gRPC configuration
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("grpc.port", "8091")
.option("grpc.max-inbound-message-size", "256000000") // 256MB
.load()
// gRPC with TLS (explicit)
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("grpc.use-plain-text", "false")
.option("grpc.tls.keystore-path", "/path/to/grpc-keystore.jks")
.option("grpc.tls.keystore-password", "keystore-password")
.option("grpc.tls.truststore-path", "/path/to/grpc-truststore.jks")
.option("grpc.tls.truststore-password", "truststore-password")
.load()
// gRPC with proxy
val data = spark.read
.format("pinot")
.option("table", "airlineStats")
.option("tableType", "offline")
.option("proxy.enabled", "true")
.option("grpc.proxy-uri", "pinot-proxy:8094")
.load()
| Option | Description | Required | Default |
|---|---|---|---|
grpc.port | Pinot gRPC port | No | 8090 |
grpc.max-inbound-message-size | Max inbound message bytes when init gRPC client | No | 128MB |
grpc.use-plain-text | Use plain text for gRPC communication (overrides secureMode for gRPC) | No | true |
grpc.tls.keystore-type | TLS keystore type for gRPC connection | No | JKS |
grpc.tls.keystore-path | TLS keystore file location for gRPC connection | No | None |
grpc.tls.keystore-password | TLS keystore password | No | None |
grpc.tls.truststore-type | TLS truststore type for gRPC connection | No | JKS |
grpc.tls.truststore-path | TLS truststore file location for gRPC connection | No | None |
grpc.tls.truststore-password | TLS truststore password | No | None |
grpc.tls.ssl-provider | SSL provider | No | JDK |
grpc.proxy-uri | Pinot Rest Proxy gRPC endpoint URI | No | None |
Note: When using gRPC with proxy, the connector automatically adds FORWARD_HOST and FORWARD_PORT metadata headers for proper request routing.
There are more examples included in src/test/scala/.../ExampleSparkPinotConnectorTest.scala.
You can run the examples locally (e.g. using your IDE) in standalone mode by starting a local Pinot cluster. See: https://docs.pinot.apache.org/basics/getting-started/running-pinot-locally
You can also run the tests in cluster mode using following command:
export SPARK_CLUSTER=<YOUR_YARN_OR_SPARK_CLUSTER>
# Edit the ExampleSparkPinotConnectorTest to get rid of `.master("local")` and rebuild the jar before running this command
spark-submit \
--class org.apache.pinot.connector.spark.v3.datasource.ExampleSparkPinotConnectorTest \
--jars ./target/pinot-spark-3-connector-0.13.0-SNAPSHOT-shaded.jar \
--master $SPARK_CLUSTER \
--deploy-mode cluster \
./target/pinot-spark-3-connector-0.13.0-SNAPSHOT-tests.jar
Spark-Pinot connector uses Spark DatasourceV2 API. Please check the Databricks presentation for DatasourceV2 API;
BIG_DECIMAL values are mapped to Spark Decimal(38,18) with HALF_UP rounding to match the declared schema. Ensure your data fits this precision/scale or cast accordingly in Spark.
Ensure proxy endpoints are properly secured
Monitor proxy health and performance
Implement proper request routing and load balancing
Use authentication for proxy access