Flink SQL与API简介
简而言之,Flink API是一套查询API,可以将文件系统、Kafka队列等外部数据在Flink中映射成表结构,然后通过API进行查询。Flink SQL则是可以直接通过SQL语句进行查询统计。
官方文档地址:https://ci.apache.org/projects/flink/flink-docs-release-1.13/
入门代码示例
先创建一个Maven工程,然后在pom.xml中添加依赖:
1 |
|
再创建一个测试数据:books.csv
1 | 西游记,吴承恩,28.00 |
创建Java程序,从本地csv中读取文件映射成表,再通过API和SQL进行查询
1 | public class FileSystemApp { |
Flink SQL 连接Kakfa
1 | public class KafkaApp { |
建表语句
1 | CREATE TABLE books ( -- 指定表名 |
当创建好Java程序后,首先先去kafka上创建一个主题
1 | kafka-topics --zookeeper hadoop001:2181 --create --replication-factor 3 --partitions 3 --topic books_topic |
此时再运行Java程序,再启动一个Kakfa生产者,发送消息进行测试,运行成功的话会在Java控制台输出结果
1 | # 运行Kafka生产者 |
DataStream转Table
Flink消费Kafka的时候,有些数据是不适合直接转换为表的,比如说多层的JSON、CSV和行数据等。所以就可以先流式读取数据,然后再map转成实体,再将实体转换为表。
使用如下代码进行转换
1 | Table booksTable = tableEnv.fromDataStream(dataStream, "name as bookName, author, price"); |
创建临时视图
View和Table的Schema完全相同,可以认为View和Table是等价的。
1 | //直接从数据流转换 |
输出表
通过TableSink可以将数据输出到文件、数据库、消息队列等中。通过Table.insertInto()实现。
1.输出到文件系统
1 | public class KafkaApp { |
2.输出到MySQL
1 | CREATE TABLE books ( |
3.输出到ES
根据ES版本来导入POM依赖
1 | <!-- 6.x --> |
1 | CREATE TABLE books ( |
4.输出到Kafka
添加pom依赖
1 | <dependency> |
1 | CREATE TABLE books ( |
SQL优化之查看执行计划
1 | String explain = tableEnv.explainSql("select name,author,price from books_table where price > 12"); |
Processing Time & Event Time
Processing Time 是流数据正在处理的时间,在SQL中指定PROCTIME()
来使用。
在有数据乱序的情况下采用事件时间,通常采用数据中的时间戳来做watermark,如下代码便是采用ts做watermark,并允许5秒的乱序。
1 | CREATE TABLE books ( |
窗口
最新版本的一些条件传Expression,可以以$()定义,如$(“10.minutes”)
滚动窗口
1 | // 10分钟一滚动,通过rowtime字段进行,是事件时间窗口,取别名为w |
滑动窗口
1 | // 10分钟一个窗口,通过rowtime字段进行,是事件时间窗口,滑动步长为5分钟,取别名为w |