pinot-connectors/pinot-flink-connector/README.md
Flink connector to write data to Pinot directly. This is useful for backfilling or bootstrapping tables, including the upsert tables. You can read more about the motivation and design in this design proposal.
// Set up flink env and data source
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.setParallelism(2); // optional
DataStream<Row> srcDs = execEnv.fromCollection(data).returns(TEST_TYPE_INFO)
// Create a ControllerRequestClient to fetch Pinot schema and table config
HttpClient httpClient = HttpClient.getInstance();
ControllerRequestClient client = new ControllerRequestClient(
ControllerRequestURLBuilder.baseUrl(DEFAULT_CONTROLLER_URL), httpClient);
// fetch Pinot schema
Schema schema = PinotConnectionUtils.getSchema(client, "starbucksStores");
// fetch Pinot table config
TableConfig tableConfig = PinotConnectionUtils.getTableConfig(client, "starbucksStores", "OFFLINE");
// create Flink Pinot Sink
srcDs.addSink(new PinotSinkFunction<>(new PinotRowRecordConverter(TEST_TYPE_INFO), tableConfig, schema));
execEnv.execute();
// Set up flink env and data source
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.setParallelism(2); // mandatory for upsert tables wi
DataStream<Row> srcDs = execEnv.fromCollection(data).returns(TEST_TYPE_INFO)
// Create a ControllerRequestClient to fetch Pinot schema and table config
HttpClient httpClient = HttpClient.getInstance();
ControllerRequestClient client = new ControllerRequestClient(
ControllerRequestURLBuilder.baseUrl(DEFAULT_CONTROLLER_URL), httpClient);
// fetch Pinot schema
Schema schema = PinotConnectionUtils.getSchema(client, "starbucksStores");
// fetch Pinot table config
TableConfig tableConfig = PinotConnectionUtils.getTableConfig(client, "starbucksStores", "REALTIME");
// create Flink Pinot Sink (partition it same as the realtime stream(e.g. kafka) in case of upsert tables)
srcDs.partitionCustom((Partitioner<Integer>) (key, partitions) -> key % partitions, r -> (Integer) r.getField("primaryKey"))
.addSink(new PinotSinkFunction<>(new PinotRowRecordConverter(TEST_TYPE_INFO), tableConfig, schema));
execEnv.execute();
For more examples, please see src/main/java/org/apache/pinot/connector/flink/FlinkQuickStart.java
segmentFlushMaxNumRecords. In the future, we could add other types of threshold such as the memory usage of the buffer.