基于 Flink 1.15 版本
1. Flink SQL 概述
1.1 离线计算 vs 实时计算
Flink SQL 最大的亮点是可以支持实时数据计算,不像之前接触的 Hive SQL 只能算离线数据。这个能力能大大简化企业里实时分析的工作。
离线计算这边,输入表 clicks 里的数据是静态的,不会变。SQL 语句执行一次算完就结束,输出表里的结果也就固定了。
实时计算那边,输入表 clicks 里的数据是实时产生的,每来一条新数据,SQL 就会重新跑一遍,输出表的结果也跟着更新。输入输出都是 "活" 的。
简单说就是数据来源不一样,离线是一次性到齐,实时是源源不断来。实时场景下,输出表会跟着输入变化而实时更新。
1.2 动态表是什么
动态表是 Flink SQL 能做流处理的核心概念。简单理解就是:一张数据会变的表就是动态表。没有动态表,Flink SQL 就没法处理流式数据。
实时数据流过来,在这个流上定义一张表,如果数据还在不断追加变化,这张表就是动态表。
2. SQL 解析引擎
2.1 Apache Calcite 是什么
Flink SQL 的解析引擎用的是 Apache Calcite,这是一个开源的 SQL 解析工具,专门用来做语法解析这块的事情。它不管存储、不管计算,就是帮你把 SQL 语句变成可以执行的代码。
Calcite 会把 SQL 语句解析成一棵树(AST 抽象语法树),然后对这棵树进行各种操作,最终把 SQL 里的计算逻辑转成具体的 Flink 代码。
2.2 主流解析引擎对比
Calcite 功能更完整,不仅能解析还能做优化,ANTLR 就只是单纯的解析器。
2.3 Calcite 的执行过程
Calcite 处理 SQL 大致分四步:
2.4 Flink SQL 完整的执行流程
SQL 语句
↓ SQL Parser(解析成 AST)
↓ SQL Validate(结合元数据校验合法性)
↓ Logical Plan(转成逻辑计划)
↓ Optimized Logical Plan(Calcite 规则 + Flink 定制规则优化)
↓ Flink Physical Plan(转成 Flink 物理计划)
↓ Flink Execution Plan(生成算子代码)
↓ 执行
3. DDL 建表语句
Flink SQL 里 DDL 主要就是创建表、修改表、删除表,实际工作中最常用的就是创建表。
3.1 需要哪些依赖
用之前一定注意看 Flink 的版本,不同小版本之间 Connector 的参数写法可能不一样。官方文档版本和代码里引入的包版本必须保持一致。
3.2 创建执行环境
需要用 StreamTableEnvironment.create() 来创建环境,同时指定是批处理还是流处理模式。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
3.3 建表基本语法
CREATE TABLE 表名 (
字段名 字段类型,
...
) WITH (
'connector' = 'xxx',
...
)
3.4 一个完整例子:File System 读数据,Print 输出
这个组合在测试环境里很常用,数据从 HDFS 文件里读,结果直接打印到控制台,方便验证逻辑对不对。
先建一个输入表,指定好文件路径和格式:
CREATE TABLE file_source (
name STRING,
age INT
) WITH (
'connector' = 'filesystem',
'path' = 'hdfs://namenode:9000/path/to/data.json',
'format' = 'json',
'json.fail-on-missing-field' = 'true',
'json.ignore-parse-errors' = 'false'
)
再建一个输出表,用 print 把结果打印出来:
CREATE TABLE print_sink (
age INT,
cnt BIGINT
) WITH (
'connector' = 'print'
)
然后写业务逻辑,统计每个年龄有多少人:
INSERT INTO print_sink
SELECT age, COUNT(*) AS cnt
FROM file_source
GROUP BY age
3.5 容易踩的坑
4. 表类型
4.1 总的分类
4.2 静态表
静态表里的数据不会随时间实时变化,一般就是批处理模式下定义的表。比如用 File System 在批处理模式建的表就是静态的。
要注意的是静态表的数据也可能变化,比如按天、按小时新增数据进去,但这种变化是离线的、可预期的,不是实时的。
4.3 动态表
动态表正好相反,数据会随时间实时变化。流处理模式下用 Kafka 或者 File System 创建的表都是动态表。
动态表是 Flink SQL 能做流处理的关键,没有它就没有流式计算。
4.4 版本表
版本表是 Flink 1.12 引入的概念(之前的 1.11 版本叫时态表,只有 Blink 引擎支持)。它本质上是带有主键和事件时间属性的动态表,能够记录每个键的历史值。
这个概念和 Hive 里的拉链表很像,都是用来记录数据变化历史的。有了版本表,可以查询某个时间点的历史数据。
建版本表的时候需要两步:先用 PRIMARY KEY 定义主键,然后用 WATERMARK 定义事件时间字段。版本表主要用在双流 join 校验的场景。
4.5 时态表函数
时态表函数也是用来访问历史数据的,但它是针对普通动态表(不是版本表)的。只能用在只追加类型的动态表上,而且不能用 SQL DDL 建,必须通过 Table API 写代码来注册。主要场景也是双流 join。
5. 连续查询
5.1 什么是连续查询
这个概念是相对于 "一次查询" 来说的。
离线场景下,对静态表跑 SQL,计算完就结束了,不会再跑第二次,这是 "一次查询"。
实时场景下,对动态表跑 SQL,只要动态表里有新数据进来,计算就会再次触发,数据不断来,计算就不断触发,这种多次执行的查询就叫 "连续查询"。
说白了就是:动态表数据一变,SQL 就重新算一遍,输入输出都是动态表。
5.2 连续查询的执行过程
实时数据流进 Flink 做 SQL 计算,整体分三步:
5.3 案例一:普通分组聚合
场景就是按 name 分组统计出现次数。
第一批数据进来:tom 出现一次,输出 tom 1;jack 出现一次,输出 jack 1。
第二批数据进来:tom 又出现了,此时 tom 已经出现过,所以更新之前的统计结果为 tom 2,jack 不变。
再进来 jessica,就输出 jessica 1。
这种情况下输出表里有新增也有更新,因为同一个人又出现了。
5.4 案例二:滚动窗口聚合
场景是按小时滚动的窗口,每小时做一次分组聚合。
SELECT name,
TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
COUNT(*) AS cnt
FROM clicks
GROUP BY name, TUMBLE(event_time, INTERVAL '1' HOUR)
特点是因为时间只会往前走,不同时间窗口的结果互不重叠,所以输出表里的数据全都是新增,不会有更新。这种也叫纯追加流。
5.5 Change Log 是什么
连续查询的输出其实可以理解成一种 Change Log 数据流,类似 MySQL 的 binlog,里面记录了数据的变更操作。
6. 数据类型
Flink SQL 支持三类数据类型:基础类型、复合类型、自定义类型。实际工作中基础类型和复合类型完全够用,自定义类型基本用不上。
6.1 基础类型
字符串:直接用 STRING。
二进制:BINARY、VARBINARY。
数值:
布尔:BOOLEAN
空值:NULL
日期时间(这块内容多,容易混淆):
关于 TIMESTAMP 和 TIMESTAMP_LTZ 的区别:
TIMESTAMP 不带时区,如果你没设过时区,默认会用 UTC,比中国时间(UTC+8)少 8 个小时。TIMESTAMP_LTZ 会读本地时区,建议直接用这个。
在代码里可以手动设置时区:
tableEnv.getConfig.setLocalTimeZone(TimeZone.getTimeZone("Asia/Shanghai"))
Interval 的用法:给时间做偏移,比如加减几天、几个月、几个小时。
6.2 复合类型
这些在 Hive 里用得很多,Flink 里用法也差不多,后面遇到具体场景再说。
7. 动态表输出编码
动态输出表要转成数据流输出去,需要对表里的数据变更行为做编码。Flink SQL 支持三种编码方式,对应三种不同的数据流。
7.1 Append Only 流(纯追加流)
只有新增操作,没有更新和删除。适用场景是时间窗口聚合、或者不涉及分组的普通实时 ETL。
7.2 Retract 流(回撤流)
包含新增、更新、删除三种操作。更新的时候比较特殊,会发两条数据:先发一条 -U(Update Before)把旧数据撤回,再发一条 +U(Update After)把新数据加进去。
所以每次更新实际上是两步操作。如果下游还要接着处理这个数据流,必须正确处理 -U 和 +U,否则数据会乱掉。
实际场景里,做分组聚合的时候,因为同一个 key 可能出现多次,结果需要更新,就会产生 Retract 流。
7.3 Upsert 流(插入更新流)
Upsert 流也是包含新增、更新、删除,但更新的时候只发一条数据,直接用新数据覆盖旧数据,不需要先撤回再加。比 Retract 流效率高。
使用 Upsert 流的前提是表里必须定义主键。
7.4 三种流怎么选
7.5 实际用的时候注意什么
如果 SQL 里用的是时间窗口聚合或者没有分组操作,输出是 Append Only 流,用普通的 Connector 就行。
如果 SQL 里用了分组聚合,会产生更新操作,需要 Retract 或 Upsert 流,写入 Kafka 的时候要选 upsert-kafka 的 Connector,普通 kafka Connector 不支持。
如果要接自己的第三方存储系统,就得自己解析数据流里的这些标识(+I、-U、+U、-D),然后按需写到自己的存储里。
8. 日期时间函数
8.1 常用函数
这些都是跟时区挂钩的,设置了东八区就返回北京时间。
8.2 日期格式化
如果觉得时间戳里毫秒位数太多不想要,可以用 DATE_FORMAT 来格式化:
SELECT DATE_FORMAT(order_time, 'yyyy-MM-dd HH:mm:ss') FROM orders
8.3 CURRENT_TIMESTAMP 和 CURRENT_ROW_TIMESTAMP 的区别
8.4 时区一定要设
建议在代码里手动把时区设成东八区:
tableEnv.getConfig.setLocalTimeZone(TimeZone.getTimeZone("Asia/Shanghai"))
不设的话,程序会读机器本身的时区,可能会拿到 UTC 时间,比北京少 8 小时,容易出 bug。
附录
常用 Connector
DataGen 生成测试数据的技巧
datagen 很适合在本地跑流处理测试用。如果要测试分组聚合的更新逻辑,需要让某个字段产生重复数据,可以限制它的取值范围:
CREATE TABLE orders (
order_id BIGINT,
price DECIMAL(10, 2),
order_time TIMESTAMP
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'order_id.gen-kind' = 'sequence',
'order_id.gen.min' = '100',
'order_id.gen.max' = '105'
)
这样 order_id 会在 100 到 105 之间循环生成,方便测试重复 key 触发的更新场景。