docs/design/jet/023-kafka-connect-source-connectors.md
| Related Jira | HZ-1510 |
| Related Github issues | None |
| Document Status / Completeness | DRAFT |
| Requirement owner | Nandan Kidambi |
| Developer(s) | Łukasz Dziedziul |
| Quality Engineer | Ondřej Lukáš |
| Support Engineer | TBD |
| Technical Reviewers | František Hartman |
| Simulator or Soak Test PR(s) | TBD |
From the Confluent documentation:
Kafka Connect, an open source component of Kafka, is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems. Using Kafka Connect you can use existing connector implementations for common data sources and sinks to move data into and out of Kafka.
We want to take an existing Kafka Connect source connectors and use them as a source for Hazelcast Jet without the need to have a Kafka deployment. Hazelcast Jet will drive the Kafka Connect connector and bring the data from external systems to the pipeline directly.
Revamp existing implementation of a custom Hazelcast source connector which can load Kafka Source Connectors and run it on Jet machinery without need for the Kafka cluster.
| Term | Definition |
|---|---|
| Kafka Source Connector | Kafka Connect component ingesting data from an external system into Kafka topics. |
| Kafka Source Connector | Kafka Connect component writing data from Kafka topics to external systems. |
| Kafka Transforms | Kafka Connect component modifying data from a source connector before it is written to Kafka, and modifying data read from Kafka before it’s written to the sink connector. |
| Kafka Converter | Kafka Connect component serializing the data when storing record from the source connector to the Kafka topic and deserializing from the topic into this internal representation. |
Provide a list of functions user(s) can perform.
To use any Kafka Connect Connector as a source in your pipeline you need to create a source by
calling KafkaConnectSources.connect()
method with the Properties object. After that you can use your pipeline like any other source in the Jet pipeline.
The source will emit items in SourceRecord type from Kafka Connect API, where you can access the key and value along
with their corresponding schemas. Hazelcast Jet will instantiate "tasks.max" number of tasks for the specified source in the cluster.
You need to make sure the source connector is available on the classpath, either by putting its jar to the classpath of
the members or by uploading the connector jar as a part for the job config.
Besides that you need to provide set of the properties used by the connector. Some of them are common while the other connector-specific. Refer to the individual connector documentation for connector-specific configuration properties.
Only name and connector.class properties are mandatory.
Example pipeline using Kafka-based RandomSourceConnector to generate random numbers
Properties randomProperties = new Properties();
//mandatory properties
randomProperties.setProperty("name", "random-source-connector");
randomProperties.setProperty("connector.class", "sasakitoa.kafka.connect.random.RandomSourceConnector");
//common properties
randomProperties.setProperty("tasks.max", "1");
//connector-specific properties
randomProperties.setProperty("generator.class", "sasakitoa.kafka.connect.random.generator.RandomInt");
randomProperties.setProperty("messages.per.second", "1000");
randomProperties.setProperty("topic", "test");
randomProperties.setProperty("task.summary.enable", "true");
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(KafkaConnectSources.connect(randomProperties))
.withoutTimestamps()
.map(record -> Values.convertToString(record.valueSchema(), record.value()))
.writeTo(AssertionSinks.assertCollectedEventually(60,
list -> assertEquals(ITEM_COUNT, list.size())));
JobConfig jobConfig = new JobConfig();
jobConfig.addJar(Objects.requireNonNull(this.getClass()
.getClassLoader()
.getResource("random-connector-1.0-SNAPSHOT.jar"))
.getPath()
);
Job job = createHazelcastInstance().getJet().newJob(pipeline, jobConfig);
No changes required
In the hazelcast-jet-contrib repository we already have an initial implementation of the KafkaConnectSources.
KafkaConnectSources instantiates the KafkaConnectSource, which is used then to create as task object which performs
the actual ingestion logic.
Everything is orchestrated by the SourceBuilder
methods: createFn, fillBufferFn, createSnapshotFn, restoreSnapshotFn, destroyFn.
The current implementation uses SourceBuilder to create a stream source of SourceRecord.
SourceRecord is Kafka data type returned by the Kafka Source Connectors for further processing.
The plan is to move the code to the main hazelcast repository and make it production ready with the following steps:
There are also nice to have features:
TaskMaxProcessorMetaSupplier evenly distributes the number of tasks specified in the properties input to cluster members.
It creates TaskMaxProcessorSupplier objects and assigns them a processor order number.
The first processor with processor order 0 is the master processor
The master processor distributes task assignments to all other processors via a
Reliable Topic named "__jet." + executionId
All processors subscribe to this Reliable Topic
A special listener is used to get the only last item if available, or to wait for the next item
A processor starts polling Kafka Connector when it receives its task assignment
The Kafka Connect connectors driven by Jet are participating to store their state snapshots (e.g partition offsets + any metadata which they might have to recover/restart) in Jet. This way when the job is restarted they can recover their state and continue to consume from where they left off. Since implementations may vary between Kafka Connect modules, each will have different behaviors when there is a failure. Please refer to the documentation of Kafka Connect connector of your choice for detailed information.
What components in Hazelcast need to change? How do they change?
How does this work in an on-prem deployment?
How about on AWS and Kubernetes, platform operator?
How does this work in Cloud Viridan clusters?
How does the change behave in mixed-version deployments? During a version upgrade? Which migrations are needed?
What are the possible interactions with other features or sub-systems inside Hazelcast? How does the behavior of other code change implicitly as a result of the changes outlined in the design document?
What are the edge cases? What are example uses or inputs that we think are uncommon but are still possible and thus need to be handled? How are these edge cases handled?
What are the effect of possible mistakes by other Hazelcast team members trying to use the feature in their own code? How does the change impact how they will troubleshoot things?
Mention alternatives, risks and assumptions. Why is this design the best in the space of possible designs? What other designs have been considered and what is the rationale for not choosing them?
hazelcast-jet-contrib repository - but it's not as well tested as main repo and less visibleThe most common mistakes by other Hazelcast users I can think of are:
Does the change impact performance? How?
How is resource usage affected for “large” loads? For example, what do we expect to happen when there are 100000 items/entries? 100000 data structures? 1000000 concurrent operations?
Is the change affecting asynchronous / background subsystems?
If so, how can users and our team observe the run-time state via tracing? - TBD
Is usage of the new feature observable in telemetry? If so, mention where in the code telemetry counters or metrics would be added. - TBD
What might be the valuable metrics that could be shown for this feature in Management Center and/or Viridan Control Plane? - number of events, check metrics infrastructure, check kafka connect metrics and monitoring that we are missing
Should this feature be configured, enabled/disabled or managed from the Management Center? How do you think your change affects Management Center? - N/A
Does the feature require or allow runtime changes to the member configuration (XML/YAML/programmatic)? N/A
Which other inspection APIs exist?
Are there new APIs, or API changes (either internal or external)?
How would you document the new APIs? Include example usage.
Which principles did you apply to ensure the APIs are consistent with other related features / APIs? (Cross-reference other APIs that are similar or related, for comparison.)
Is the change visible to users of Hazelcast or operators who run Hazelcast clusters?
Are there any user experience (UX) changes needed as a result of this change?
Are the UX changes necessary or clearly beneficial? (Cross-reference the motivation section.)
Which principles did you apply to ensure the user experience (UX) is consistent with other related features? ( Cross-reference other features that have related UX, for comparison.)
Is usage of the new feature observable in telemetry? If so, mention where in the code telemetry counters or metrics would be added.
Should this feature be configured, enabled/disabled or managed from the Management Center? How do you think your change affects Management Center?
Does the feature require or allow runtime changes to the member configuration (XML/YAML/programmatic)?
Are usage statistics for this feature reported in Phone Home? If not, why?
Describe testing approach to developed functionality