docs/design/jet/022-mongodb-connector.md
| Related Jira | https://hazelcast.atlassian.net/browse/HZ-1508 |
| Related Github issues | None |
| Document Status / Completeness | DRAFT |
| Requirement owner | TBD |
| Developer(s) | Tomasz Gawęda |
| Quality Engineer | TBD |
| Support Engineer | TBD |
| Technical Reviewers | Frantisek Hartman |
| Simulator or Soak Test PR(s) | TBD |
| Term | Definition |
|---|---|
| Jet connector | A set of classes that can be used as a Hazelcast Jet's source and/or sink. |
| SQL Connector | A set of classes (one implementing SqlConnector) that will add functionality to use given source/sink in SQL queries |
MongoDB is a widely used NoSQL, document-based database. Although there
was a Jet sink and source connector implemented in
the hazelcast-jet-contrib repository, users didn't really have
properly tested and supported connector for this database.
Create connector that:
MapStore featureIt's not a goal of this connector to control MongoDB settings; any changes in MongoDB configuration or any administration action will still be needed to be done on MongoDB side.
TBD
type in SQL mappings.null in case of lack of values.Use the ⚠️ or ❓icon to indicate an outstanding issue or question, and use the ✅ or ℹ️ icon to indicate a resolved issue or question.
Example usage of the source:
BatchSource<Document> batchSource =
MongoSources.batch(
"batch-source",
"mongodb://127.0.0.1:27017",
"myDatabase",
"myCollection",
new Document("age", new Document("$gt", 10)),
new Document("age", 1)
);
Pipeline p = Pipeline.create();
BatchStage<Document> srcStage = p.readFrom(batchSource);
This example queries documents in a collection having the field
age with a value greater than 10 and applies a projection so that only
the age field is returned in the emitted document. User should be also
able to add e.g. MyUser.class as the last argument, which will mean
that all returned objects will be automatically mapped to Java POJO
see MongoDB docs
Example usage of sink:
Pipeline p = Pipeline.create();
p.readFrom(Sources.list(list))
.map(i -> new Document("key", i))
.writeTo(
MongoSinks.mongodb(
"sink",
"mongodb://localhost:27017",
"myDatabase",
"myCollection"
)
);
User should be able to run following SQL query:
CREATE MAPPING people
DATA CONNECTION "mongodb-ref"
In such cases, automatic schema inference will be used. User may also want to provide schema explicitly:
CREATE MAPPING people (
firstName VARCHAR(100),
lastName VARCHAR(100),
age INT
)
DATA CONNECTION "mongodb-ref"
User will be able to configure map store:
Config config = new Config();
config.addDataConnectionConfig(
new DataConnectionConfig("mongodb-ref")
.setType("MongoDB")
.setProperty("connectionString", dbConnectionUrl)
);
MapConfig mapConfig = new MapConfig(mapName);
MapStoreConfig mapStoreConfig = new MapStoreConfig();
mapStoreConfig.setClassName(GenericMapStore.class.getName());
mapStoreConfig.setProperty(OPTION_DATA_CONNECTION_REF, "mongodb-ref");
mapConfig.setMapStoreConfig(mapStoreConfig);
instance().getConfig().addMapConfig(mapConfig);
The hazelcast-jet-contrib repository contained MongoDB connector. It was
based on the SourceBuilder, which is nice in general, but for more
flexibility we need to rework this connector to low-level Processor/
ProcessorSupplier/ProcessorMetaSupplier:
SqlConnector class' methods are working on DAG level, so it will be easier
to reuse processorsWe can check if we could use MongoDB's Replica sets. If yes, then our MetaSupplier can spawn as many ProcessorSuppliers as MongoDB has replicas and configure Mongo client to use this specific replica.
Each MongoDB source processor can read just a part of collection. Two main approaches for reading a part of collections are:
__id (see Document's docs)
range to as many elements as there will be processors
and make each processor read this slice__id mod processorGlobalIndex to determine which element in
collection will be read by which processor.User may want to read from many collections in one source. In such situation it's questionable if each processor should deal with one collection or split it's work across collections. In the first iteration each processor will deal with one collection, Processor Supplier will create processors on each node per collection.
Source and sink should take read and write concerns into consideration.
The SQL Connector should support:
Defining schema is not trivial (Mongo is schema-less theoretically). Possible solutions:
$jsonSchema validator
(db.getCollectionInfos( { name: "myCollection" } )[0].options.validator)
and use it to create a schema or read one element and check if schema
is available (db.runCommand({listCollections: 1, filter:{ name: "students" }})).
MongoDB's docs on this topic
and second documentation page.All the methods can be combined and used in following order:
$jsonSchemaEach of steps will be invoked only if previous failed.
It should be also possible to push the predicates down to Mongo if possible.
For example, if user invokes select name from people where age > 18,
the age > 18 predicate should be - if possible - changed to Mongo's predicate
and name should become the projection.
The implementation will be split into 4 parts:
PR#1: Import old connector to core repository
PR#2: Source and sink for MongoDB
PR#3: The SQL connector
PR#4: Support for GenericMapStore (incl. mapping derivation)
What components in Hazelcast need to change? How do they change?
How does this work in an on-prem deployment?
Are there new abstractions introduced by the change? New concepts? If yes, provide definitions and examples.
How about on AWS and Kubernetes, platform operator?
How does the change behave in mixed-version deployments? During a version upgrade? Which migrations are needed?
How does this work in Cloud Viridan clusters?
How does the change behave in mixed-version deployments? During a version upgrade? Which migrations are needed?
What are the possible interactions with other features or sub-systems inside Hazelcast? How does the behavior of other code change implicitly as a result of the changes outlined in the design document?
What are the edge cases? What are example uses or inputs that we think are uncommon but are still possible and thus need to be handled? How are these edge cases handled?
null for missing fields and omit all additional fields.What are the effect of possible mistakes by other Hazelcast team members trying to use the feature in their own code? How does the change impact how they will troubleshoot things?
Mention alternatives, risks and assumptions. Why is this design the best in the space of possible designs? What other designs have been considered and what is the rationale for not choosing them?
hazelcast-jet-contrib repository - but it's not as well tested as main repo and less visible
There is also a debatable change, that source and sink will now be a processor implementation, not using
SourceBuilder. However, as mentioned in Technical design, it's a preferred way to
create a SQL connector.Add links to any similar functionalities by other vendors, similarities and differentiators
Is the change affecting asynchronous / background subsystems?
Which other inspection APIs exist?
Are there new APIs, or API changes (either internal or external)?
How would you document the new APIs? Include example usage.
What are the other components or teams that need to know about the new APIs and changes?
Which principles did you apply to ensure the APIs are consistent with other related features / APIs? (Cross-reference other APIs that are similar or related, for comparison.)
Is the change visible to users of Hazelcast or operators who run Hazelcast clusters?
Are there any user experience (UX) changes needed as a result of this change?
Are the UX changes necessary or clearly beneficial? (Cross-reference the motivation section.)
Which principles did you apply to ensure the user experience (UX) is consistent with other related features? (Cross-reference other features that have related UX, for comparison.)
Which other engineers or teams have you polled for input on the proposed UX changes? Which engineers or team may have relevant experience to provide feedback on UX?
Is usage of the new feature observable in telemetry? If so, mention where in the code telemetry counters or metrics would be added.
What might be the valuable metrics that could be shown for this feature in Management Center and/or Viridan Control Plane?
Should this feature be configured, enabled/disabled or managed from the Management Center? How do you think your change affects Management Center?
Does the feature require or allow runtime changes to the member configuration (XML/YAML/programmatic)?
Are usage statistics for this feature reported in Phone Home? If not, why?