docs/content.zh/docs/core-concept/data-pipeline.md
由于在 Flink CDC 中,事件从上游流转到下游遵循 Pipeline 的模式,因此整个 ETL 作业也被称为 Data Pipeline。
一个 pipeline 包含着 Flink 的一组算子链。 为了描述 Data Pipeline,我们需要定义以下部分:
下面 是 Data Pipeline 的一些可选配置:
我们可以使用以下 yaml 文件来定义一个简单的 Data Pipeline 来同步 MySQL app_db 数据库下的所有表到 Doris:
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
我们可以使用以下 yaml 文件来定义一个复杂的 Data Pipeline 来同步 MySQL app_db 数据库下的所有表到 Doris,并给目标数据库名 ods_db 和目标表名前缀 ods_:
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
transform:
- source-table: adb.web_order01
projection: \*, format('%S', product_name) as product_name
filter: addone(id) > 10 AND order_id > 100
description: project fields and filter
- source-table: adb.web_order02
projection: \*, format('%S', product_name) as product_name
filter: addone(id) > 20 AND order_id > 200
description: project fields and filter
route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
user-defined-function:
- name: addone
classpath: com.example.functions.AddOneFunctionClass
- name: format
classpath: com.example.functions.FormatFunctionClass
下面是 Data Pipeline 级别支持的配置选项。
请注意,虽然这些参数都是可选的,但至少需要指定其中一个。也就是说,pipeline 部分是必需的,不能为空。
| 参数 | 含义 | optional/required |
|---|---|---|
name | 这个 pipeline 的名称,会用在 Flink 集群中作为作业的名称。 | optional |
parallelism | pipeline的全局并发度,默认值是1。 | optional |
local-time-zone | 作业级别的本地时区。 | optional |
execution.runtime-mode | pipeline 的运行模式,包含 STREAMING 和 BATCH,默认值是 STREAMING。 | optional |
route-mode | [路由]({{< ref "docs/core-concept/route" >}}#路由模式)规则的匹配模式。可选值:ALL_MATCH(默认,应用所有匹配的规则)或 FIRST_MATCH(只应用第一个匹配的规则)。 | optional |
schema.change.behavior | 如何处理 [schema 变更]({{< ref "docs/core-concept/schema-evolution" >}})。可选值:[exception]({{< ref "docs/core-concept/schema-evolution" >}}#exception-mode)、[evolve]({{< ref "docs/core-concept/schema-evolution" >}}#evolve-mode)、[try_evolve]({{< ref "docs/core-concept/schema-evolution" >}}#tryevolve-mode)、[lenient]({{< ref "docs/core-concept/schema-evolution" >}}#lenient-mode)(默认值)或 [ignore]({{< ref "docs/core-concept/schema-evolution" >}}#ignore-mode)。 | optional |
schema.operator.uid | Schema 算子的唯一 ID。此 ID 用于算子间通信,必须在所有算子中保持唯一。已废弃:请使用 operator.uid.prefix 代替。 | optional |
schema-operator.rpc-timeout | SchemaOperator 等待下游 SchemaChangeEvent 应用完成的超时时间,默认值是 3 分钟。 | optional |
operator.uid.prefix | Pipeline 中算子 UID 的前缀。如果不设置,Flink 会为每个算子生成唯一的 UID。 建议设置这个参数以提供稳定和可识别的算子 ID,这有助于有状态升级、问题排查和在 Flink UI 上的诊断。 | optional |
注意:虽然上述参数都是可选的,但至少需要指定其中一个。pipeline 部分是必需的,不能为空。