seatunnel-transforms-v2/README.zh.md
本文档介绍了如何理解、开发和贡献 transform。
我们还提供了 Transform E2E 测试 来验证 transform 的数据输入和输出。
使用 SeaTunnel,你可以通过连接器读取或写入数据,但如果你需要在读取数据后或写入数据前处理数据,就需要使用 transform。
使用 transform 可以对数据行或字段进行简单的编辑,例如拆分字段、修改字段值、添加或删除字段。
Transform 从上游(源或 transform)接收数据类型输入,并将新的数据类型输出到下游(接收器或 transform)。这个过程就是数据类型转换。
示例 1:删除字段
| A | B | C |
|-----------|-----------|-----------|
| STRING | INT | BOOLEAN |
| A | B |
|-----------|-----------|
| STRING | INT |
示例 2:排序字段
| B | C | A |
|-----------|-----------|-----------|
| INT | BOOLEAN | STRING |
| A | B | C |
|-----------|-----------|-----------|
| STRING | INT | BOOLEAN |
示例 3:更新字段数据类型
| A | B | C |
|-----------|-----------|-----------|
| STRING | INT | BOOLEAN |
| A | B | C |
|-----------|-----------|-----------|
| STRING | STRING | STRING |
示例 4:添加新字段
| A | B | C |
|-----------|-----------|-----------|
| STRING | INT | BOOLEAN |
| A | B | C | D |
|-----------|-----------|-----------|-----------|
| STRING | INT | BOOLEAN | DOUBLE |
在数据类型转换之后,Transform 将接收来自上游(源或 transform)的数据行输入,编辑为具有新数据类型的数据行,并将其输出到下游(接收器或 transform)。这个过程称为数据转换。
Transform 与执行引擎解耦,任何 transform 实现都可以在所有引擎中运行,而无需更改代码或配置,这需要翻译层来适配 transform 和执行引擎。
示例:数据类型和数据的翻译
原始数据:
| A | B | C |
|-----------|-----------|-----------|
| STRING | INT | BOOLEAN |
数据类型翻译:
| A | B | C |
|-------------------|-------------------|-------------------|
| ENGINE<STRING> | ENGINE<INT> | ENGINE<BOOLEAN> |
数据翻译:
| A | B | C |
|-------------------|-------------------|-------------------|
| ENGINE<"test"> | ENGINE<1> | ENGINE<false> |
createTransform 方法创建 transform 实例。factoryIdentifier 用于标识当前工厂的名称,这在配置文件中也会进行配置,以区分不同的 transform。optionRule 用于定义当前 transform 支持的参数。此方法可以用来定义参数的逻辑,比如哪些参数是必需的,哪些是可选的,哪些是互斥的等等。SeaTunnel 会使用 OptionRule 来验证用户配置的有效性。请参考下面的 Option。TableTransformFactory 上添加 @AutoService(Factory.class) 注解。我们可以从上游接收目录表输入,并从 TableTransformFactoryContext 获取 transform 配置。
@Override
public TableTransform createTransform(TableTransformFactoryContext context) {
return () ->
new SQLMultiCatalogFlatMapTransform(
context.getCatalogTables(), context.getOptions());
}
SeaTunnelTransform 提供了所有主要和核心的 API,你可以通过继承它来实现 transform。
获取该 transform 产生的目录表列表。
List<CatalogTable> getProducedCatalogTables();
或者获取该 transform 产生的目录表。
CatalogTable getProducedCatalogTable();
如果 transform 需要更改 schema,可以处理 SchemaChangeEvent。
default SchemaChangeEvent mapSchemaChangeEvent(SchemaChangeEvent schemaChangeEvent) {
return schemaChangeEvent;
}
编辑输入数据并输出新的数据到下游,使用 SeaTunnelMapTransform。
T map(T row);
或者编辑输入数据并输出新的数据到下游,使用 SeaTunnelFlatMapTransform。
List<T> flatMap(T row);
SingleFieldOutputTransform 抽象了单字段变换操作。
定义输出字段列。
protected abstract Column getOutputColumn();
定义输出字段的值。
protected abstract Object getOutputFieldValue(SeaTunnelRowAccessor inputRow);
MultipleFieldOutputTransform 抽象了多字段变换操作。
定义输出字段列。
protected abstract Column[] getOutputColumns();
定义输出字段的值。
protected abstract Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow);
AbstractSeaTunnelTransform 抽象了数据类型、表路径和字段变换操作。
转换输入行类型并输出新行类型。
protected abstract TableSchema transformTableSchema();
转换输入行数据并输出新数据行。
protected abstract R transformRow(SeaTunnelRow inputRow);
转换输入目录表路径并输出新目录表路径。
protected abstract TableIdentifier transformTableIdentifier();
包含了 transform 公共功能的基本实现,以及 transform 功能的高级封装。你可以通过实现这些类来快速开发 transform。
AbstractCatalogSupportFlatMapTransform 和 AbstractCatalogSupportMapTransform 的多表版本。包含了多表 transform 的封装。有关多表 transform 的更多信息,请参阅 transform-multi-table.md
你必须实现以下 API 中的一个:
将实现的子类添加到模块 seatunnel-transforms-v2 中。
在 SeaTunnel 根路径的 plugin-mapping.properties 文件中添加 transform 信息。
请参考 transform 的源代码
一旦你添加了一个新的插件,建议为它添加 e2e 测试。
我们有一个 seatunnel-e2e/seatunnel-transforms-v2-e2e 模块来帮助你完成这项工作。
例如,如果你想为 CopyFieldTransform 添加 e2e 测试,可以在 seatunnel-e2e/seatunnel-transforms-v2-e2e 模块中创建一个新测试,并在测试中扩展 TestSuiteBase 类。
public class TestCopyFieldTransformIT extends TestSuiteBase {
@TestTemplate
public void testCopyFieldTransform(TestContainer container) {
Container.ExecResult execResult = container.executeJob("/copy_transform.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
}
一旦你的测试用例实现了 TestSuiteBase 接口并使用 @TestTemplate 注解启动,它将针对所有引擎运行作业,你只需要执行 executeJob 方法并提供你的 SeaTunnel 配置文件,它将提交 SeaTunnel 作业。