FlinkSQL与API极速入门

简而言之,Flink API是一套查询API,可以将文件系统、Kafka队列等外部数据在Flink中映射成表结构,然后通过API进行查询。Flink SQL则是可以直接通过SQL语句进行查询统计。

官方文档地址:https://ci.apache.org/projects/flink/flink-docs-release-1.13/

入门代码示例

先创建一个Maven工程,然后在pom.xml中添加依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-realtime-mall</artifactId>
<groupId>com.whoiszxl</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>flink-sql</artifactId>

<properties>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.12.0</flink.version>
<scala.version>2.12</scala.version>
<hadoop.version>3.1.3</hadoop.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>provided</scope>
</dependency>

</dependencies>

</project>

再创建一个测试数据:books.csv

1
2
3
4
西游记,吴承恩,28.00
三国演义,罗贯中,13.00
红楼梦,曹雪芹,12.00
水浒传,施耐庵,11.00

创建Java程序,从本地csv中读取文件映射成表,再通过API和SQL进行查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class FileSystemApp {

public static String CSV_PATH = "/opt/data/books.txt";

public static void main(String[] args) throws Exception {
//1. 获取流环境执行上下文环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2. 设置并行度,测试时设置为1
env.setParallelism(1);

//3. 设置Checkpoint检查点
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); //检查点5秒一次,模式为精准一次性
env.getCheckpointConfig().setCheckpointTimeout(60000); //超时时间一分钟
StateBackend fsStateBackend = new FsStateBackend("hdfs://hadoop001:8020/appName/flink/checkpoint");
env.setStateBackend(fsStateBackend); //checkpoint持久化到hdfs上

//4. 定义Table流环境的配置
EnvironmentSettings settings = EnvironmentSettings
.newInstance() //创建实例
.useBlinkPlanner() //使用Blink的计划器
.inStreamingMode() //使用流处理模式, .inBatchMode()为批处理模式
.build();

//5. 通过流上下文环境与table配置创建Table流环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

//6. 定义文件系统数据源 connect已经被定义为@Deprecated,建议使用executeSql通过DDL语言创建
tableEnv.connect(new FileSystem().path(CSV_PATH))
.withFormat(new Csv()) //用CSV格式进行格式化
.withSchema( //定义表结构
new Schema()
.field("name", DataTypes.STRING())
.field("author", DataTypes.STRING())
.field("price", DataTypes.DOUBLE())
).createTemporaryTable("books_table"); //创建临时表并赋名

Table resultTable = tableEnv.from("books_table") //指定从什么表查询
.select(Expressions.$("name"), //需要查询什么字段
Expressions.$("author"),
Expressions.$("price"))
.where(Expressions.$("name").isEqual("西游记")); //查询条件

//7. 将表转流并输出
tableEnv.toAppendStream(resultTable, Book.class).print();

//8. 使用SQL进行查询并输出
TableResult tableResult = tableEnv.executeSql("select name,author,price from books_table where price > 12");
tableResult.print();

env.execute();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class KafkaApp {

public static void main(String[] args) throws Exception {
//1. 获取流环境执行上下文环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2. 设置并行度,测试时设置为1
env.setParallelism(1);

//3. 设置Checkpoint检查点
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); //检查点5秒一次,模式为精准一次性
env.getCheckpointConfig().setCheckpointTimeout(60000); //超时时间一分钟
StateBackend fsStateBackend = new FsStateBackend("hdfs://hadoop001:8020/appName/flink/checkpoint");
env.setStateBackend(fsStateBackend); //checkpoint持久化到hdfs上

//4. 定义Table流环境的配置
EnvironmentSettings settings = EnvironmentSettings
.newInstance() //创建实例
.useBlinkPlanner() //使用Blink的计划器
.inStreamingMode() //使用流处理模式, .inBatchMode()为批处理模式
.build();

//5. 通过流上下文环境与table配置创建Table流环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

//6. 使用executeSql通过DDL语言创建
tableEnv.executeSql("CREATE TABLE books (" +
" name STRING," +
" author STRING," +
" price DECIMAL(8,2)," +
" ts TIMESTAMP(3)," +
" WATERMARK FOR ts as ts - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'books_topic'," +
" 'scan.startup.mode' = 'latest-offset'," +
" 'properties.bootstrap.servers' = 'hadoop001:9092'," +
" 'format' = 'json'" +
")");

TableResult tableResult = tableEnv.executeSql("select name,author,price from books where price >= 13");
tableResult.print();
env.execute();
}
}

建表语句

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE books ( -- 指定表名
name STRING, -- 书籍名称字段
author STRING, -- 书籍作者字段
price DECIMAL(8,2), -- 价格字段
ts TIMESTAMP(3), -- eventTime 时间戳字段
WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 以ts创建watermark,允许5秒乱序
) WITH (
'connector' = 'kafka', -- 指定连接器为kafka
'topic' = 'books_topic', -- 指定数据源主题
'scan.startup.mode' = 'latest-offset', -- 从最新的offset开始消费
'properties.bootstrap.servers' = 'hadoop001:9092', -- 指定kafka地址
'format' = 'json' -- 指定消费的数据格式为json
)

当创建好Java程序后,首先先去kafka上创建一个主题

1
kafka-topics --zookeeper hadoop001:2181 --create --replication-factor 3 --partitions 3 --topic books_topic

此时再运行Java程序,再启动一个Kakfa生产者,发送消息进行测试,运行成功的话会在Java控制台输出结果

1
2
3
4
5
6
7
8
# 运行Kafka生产者
kafka-console-producer --broker-list hadoop001:9092 --topic books_topic

# 发送消息,输入的数据price大于等于13的会在控制台输出
>{"name":"三国演义", "author": "罗贯中", "price": 15.00}
>{"name":"西游记", "author": "吴承恩", "price": 20.00}
>{"name":"百年孤独", "author": "马尔克斯", "price": 13.00}
>{"name":"白鹿原", "author": "陈忠实", "price": 9.00}

DataStream转Table

Flink消费Kafka的时候,有些数据是不适合直接转换为表的,比如说多层的JSON、CSV和行数据等。所以就可以先流式读取数据,然后再map转成实体,再将实体转换为表。

使用如下代码进行转换

1
Table booksTable = tableEnv.fromDataStream(dataStream, "name as bookName, author, price");

创建临时视图

View和Table的Schema完全相同,可以认为View和Table是等价的。

1
2
3
4
5
6
7
8
//直接从数据流转换
tableEnv.createTemporaryView("booksView", dataStream);

//从数据流转换并指定字段
tableEnv.createTemporaryView("booksView", dataStream, "name as bookName, author, price");

//基于Table创建视图
tableEnv.createTemporaryView("booksView", booksTable);

输出表

通过TableSink可以将数据输出到文件、数据库、消息队列等中。通过Table.insertInto()实现。

1.输出到文件系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class KafkaApp {

public static void main(String[] args) throws Exception {
//1. 获取流环境执行上下文环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2. 设置并行度,测试时设置为1
env.setParallelism(1);

//3. 设置Checkpoint检查点
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); //检查点5秒一次,模式为精准一次性
env.getCheckpointConfig().setCheckpointTimeout(60000); //超时时间一分钟
StateBackend fsStateBackend = new FsStateBackend("hdfs://hadoop001:8020/appName/flink/checkpoint");
env.setStateBackend(fsStateBackend); //checkpoint持久化到hdfs上

//4. 定义Table流环境的配置
EnvironmentSettings settings = EnvironmentSettings
.newInstance() //创建实例
.useBlinkPlanner() //使用Blink的计划器
.inStreamingMode() //使用流处理模式, .inBatchMode()为批处理模式
.build();

//5. 通过流上下文环境与table配置创建Table流环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

//6. 使用executeSql通过DDL语言创建源数据表
tableEnv.executeSql("CREATE TABLE books (" +
" name STRING," +
" author STRING," +
" price DECIMAL(8,2)," +
" ts TIMESTAMP(3)," +
" WATERMARK FOR ts as ts - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'books_topic'," +
" 'scan.startup.mode' = 'latest-offset'," +
" 'properties.bootstrap.servers' = 'hadoop001:9092'," +
" 'format' = 'json'" +
")");

//7. 使用executeSql通过DDL语言创建输出数据表,指定为文件系统
tableEnv.executeSql("CREATE TABLE books_result (" +
" name STRING," +
" author STRING," +
" price DECIMAL(8,2)" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'sink.partition-commit.delay'='1 h'," +
" 'sink.partition-commit.policy.kind'='success-file'," +
" 'path' = '../books_result.csv'," +
" 'format' = 'csv'" +
")");

//8. 直接执行insert 操作
tableEnv.executeSql("insert into books_result select name,author,price from books where price >= 13");

env.execute();
}
}

2.输出到MySQL

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE books (
name STRING,
author STRING,
price DECIMAL(8,2)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://hadoop001:3306/mydatabase',
'username' = 'root',
'password' = '123456',
'table-name' = 'books',
);

3.输出到ES

根据ES版本来导入POM依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- 6.x -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.13.0</version>
</dependency>

<!-- 7.x and later versions -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.13.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
CREATE TABLE books (
name STRING,
author STRING,
price DECIMAL(8,2)
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'books'
);

4.输出到Kafka

添加pom依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE books (
name STRING,
author STRING,
price DECIMAL(8,2)
) WITH (
'connector' = 'kafka',
'topic' = 'books_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'books_group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)

SQL优化之查看执行计划

1
2
String explain = tableEnv.explainSql("select name,author,price from books_table where price > 12");
System.out.println(explain);

Processing Time & Event Time

Processing Time 是流数据正在处理的时间,在SQL中指定PROCTIME()来使用。

在有数据乱序的情况下采用事件时间,通常采用数据中的时间戳来做watermark,如下代码便是采用ts做watermark,并允许5秒的乱序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE books (
name STRING,
author STRING,
price DECIMAL(8,2),
proc_time as PROCTIME(),
ts TIMESTAMP(3),
WATERMARK FOR ts as ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://hadoop001:3306/mydatabase',
'username' = 'root',
'password' = '123456',
'table-name' = 'books',
);

窗口

最新版本的一些条件传Expression,可以以$()定义,如$(“10.minutes”)

滚动窗口

1
2
3
4
5
6
7
8
// 10分钟一滚动,通过rowtime字段进行,是事件时间窗口,取别名为w
.window(Tumble.over("10.minutes").on("rowtime").as("w"));

// 10分钟一滚动,通过proctime字段进行,是处理时间窗口,取别名为w
.window(Tumble.over("10.minutes").on("proctime").as("w"));

// 10条记录一滚动,通过proctime字段进行,取别名为w
.window(Tumble.over("10.rows").on("proctime").as("w"));

滑动窗口

1
2
3
4
5
6
7
8
// 10分钟一个窗口,通过rowtime字段进行,是事件时间窗口,滑动步长为5分钟,取别名为w
.window(Slide.over($("10.minutes")).every("5.minutes").on("rowtime").as("w"));

// 10分钟一个窗口,通过proctime字段进行,是处理时间窗口,滑动步长为5分钟,取别名为w
.window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"));

// 10条记录一滚动,通过proctime字段进行,滑动步长为5条记录,取别名为w
.window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"));