Flink高级API极速入门

前言

在学习完零基础入门后,就能够学会如何创建流,读取数据,如何使用算子进行数据处理,如何将计算结果输出到MySQL,Redis等地方了。但是因为在数据传输过程中会出现很多各种各样的情况,比如说上报的数据时间是乱序的,这个就需要用到Window窗口了。

Window概念介绍

Window本质上是流处理与批处理的桥梁,可以让你在无限流里创建有限流进行操作,比如说可以在源源不断的订单数据中计算5分钟内的销售额,计算最近100个订单都是哪个地区的。

所以此时通过这个例子,我们就能发现Window是分两种,一种是CountWindow,按照指定的数据条数来创建Window,就是上面所说的通过100个订单来统计地区,另一种是TimeWindow,按照指定的时间来生成窗口。

TimeWindow其实也分了三种,一种滚动窗口(Tumbling Window),一种滑动窗口(Sliding Window),还有一种会话窗口(Session Window)。

  1. 滚动窗口:窗口数据没有重叠,比如说每五分钟统计一次,那么一小时就有12个不重叠的窗口创建。
  2. 滑动窗口:窗口数据是可能存在重叠的,比如说窗口是五分钟的范围,然后滑动参数是3分钟,当第一个窗口运行到三分钟的时候,第二个窗口就启动了,所以此时第一个窗口的后两分钟和第二个窗口的前两分钟是重叠的。
  3. 会话窗口:类似于web的session,当一段时间没有收到新数据的时候就会生成新的窗口。

Window API

滚动窗口

1
2
3
4
5
6
7
//desc: 每10秒统计一次窗口里的最小值
//1. 对2元素的元组的第一个进行分组
//2. 然后创建一个10秒长的滚动窗口,一分钟6个窗口
//3. 对元组第二个元素进行取最小值操作
ds.keyBy(data -> data.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.minBy(1);

滑动窗口

1
2
3
4
5
6
7
//desc: 每隔10秒开启一个窗口,统计之后60秒里的最小值
//1. 对2元素的元组的第一个进行分组
//2. 然后创建一个60秒长的滑动窗口,每10秒创建一次,如果窗口和滑动间隔都是同样的话,那就是一个滚动窗口了
//3. 对元组第二个元素进行取最小值操作
ds.keyBy(data -> data.f0)
.window(SlidingEventTimeWindows.of(Time.seconds(60), Time.seconds(10)))
.minBy(1);

计数窗口

1
2
3
4
5
6
7
8
9
//desc 滚动方式每一百次求一次和
ds.keyBy(data -> data.f0)
.countWindow(100)
.sum(1);

//desc 滑动方式每10次创建一个窗口,每个窗口容纳100次。
ds.keyBy(data -> data.f0)
.countWindow(100, 10)
.sum(1);

Window Function

常用增量聚合函数有:ReduceFcuntion,AggregateFunction,每有数据来就进行计算,保持一个简单的状态。

全窗口函数有:ProcessWindowFunction。先收集窗口中所有数据,等到计算的时候再统一遍历处理。

Flink的多种时间状态

  1. 每条记录上传时,比如在APP发生了一次点击商详的事件,那么在点击的那一刻就会创建一个Event Time,就是这个事件时间。(对于统计与时间有强关系的流,事件时间则更加合适,大部分的流计算都用此时间。)
  2. 当此次点击商详时间的日志上传到SpringBoot项目里,再发送到了Kafka,Flink消费到Kafka里的日志时,就会产生一个Ingestion Time,这个就是数据进入Flink的摄入时间。
  3. 在算子执行的过程中,就会产生一个Processing Time,是机器的本地时间,无配置其他时间则默认此时间。

Watermark

在我们点击商品详情的时候,消息日志从APP客户端到Web端,再到Kafka消息队列,再到Flink中,是需要有一定时间的。虽然基本上每次点击都是有顺序的,但是不能排除存在因为网络延迟,中间件宕机等缘故,而导致乱序,导致点击加购的事件会发生在进入商详之前。

此时就需要一个watermark机制的引入来处理乱序事件,watermark是一个告诉Flink消息能够延迟多久的机制,比如说10秒的窗口,水位线为2秒,那么此时Flink就会等到12秒的时候才关闭窗口进行计算。

引入方式

比如说接收了一个订单信息流,此时指定一个乱序时间戳提取器,指定提取的事件时间为创建时间,并且此时间必须是毫秒数。

1
2
3
4
5
6
7
ds.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderEntity>() {
@Override
public long extractTimestamp(OrderEntity element) {
//指定数据源中的时间戳,毫秒级时间戳
return element.getCreateTime();
}
});

底层API ProcessFunction

只有底层API才能够访问事件的时间戳和水位线和注册定时事件,还可以输出特定的事件如超时事件。Flink有如下8个Process Function:

1
2
3
4
5
6
7
8
1. ProcessFunction
2. KeyedProcessFunction
3. CoProcessFunction
4. ProcessJoinFunction
5. BroadcastProcessFunction
6. KeyedBroadcastProcessFunction
7. ProcessWindowFunction
8. ProcessAllWindowFunction

KeyedProcessFunction简介

此函数用来操作KeyedStream,KeyedProcessFunction会去处理流中的每个元素,输出为0,1,或多个元素,在具有基础的open(),close(),getRuntimeContext()等方法外,还有额外的两个方法:processElement, onTimer。

代码实现:

1
//todo 

SideOutput

侧输出流可以产生多条数据类型不一样的流,与split切分类似,只是split切分的流类型是一样的。

代码实现,实现按国家输出侧流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//1. 定义CN标签和USA标签
OutputTag<JSONObject> cnTag = new OutputTag<JSONObject>("cn"){};
OutputTag<JSONObject> usaTag = new OutputTag<JSONObject>("usa"){};

//2. 通过process方法对每个元素进行处理,通过country判断进行分流
SingleOutputStreamOperator<JSONObject> otherCountry = jsonObjDS.process(new ProcessFunction<JSONObject, JSONObject() {

@Override
public void processElement(JSONObject value, Context ctx, Collector<JSONObject> out) throws Exception {
if (value.getString("country").equals("cn")) {
ctx.output(cnTag, value);
} else if (value.getString("country").equals("usa")) {
ctx.output(usaTag, value);
}
out.collect(value);
}
});

//3. 从主流中通过标签获取到侧流
DataStream<JSONObject> cnSideOutput = otherCountry.getSideOutput(cnTag);
DataStream<JSONObject> usaSideOutput = otherCountry.getSideOutput(usaTag);

//4. 输出
otherCountry.print("other::");
cnSideOutput.print("cn::");
usaSideOutput.print("usa::");

CoProcessFunction

两条输入流可以使用此方法来操作,使用processElement1()和processElement2()可以分别操作两条流,这两个方法都可以使用Context对象来调用,context可以访问事件数据,定时器时间戳,TimerService和SideOutputs。CoProcessFunction也提供了onTimer()。

状态编程和容错机制

流式计算分为有状态和无状态,无状态接收记录,并直接计算直接输出记录,有状态则会根据输入记录来维护中间状态,并通过输入记录与状态值来进行计算输出。

有状态算子

在Flink中有两种类型的算子,分别是:算子状态和键控状态。

算子状态的作用范围限定为算子任务,同一并行任务处理的所有数据都可以访问到相同的状态,对同一个任务而言是共享的。算子状态有三种基本数据结构。

1
2
3
4
5
6
7
8
1. 列表状态(List state)
将状态表示为一组数据的列表。

2. 联合列表状态(Union list state)
状态表示为数据的列表。与常规列表状态的区别在于在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。

3. 广播状态(Broadcast state)
如果一个算子有多项任务且每项任务状态都相同,此情况最适应用广播状态

键控状态是根据数据流中定义的键来维护和访问的,Flink为每个键值维护一个状态实例,并将键相同的数据都分区到同一个算子任务里,这个任务会维护和处理这个key的对应状态,当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同的key的所有数据都会访问相同的状态。就相当于对数据分了组,并给每个组都一个状态。

Checkpoint 检查点

保证exactly-once,就需要使用检查点在出现故障(网络故障,代码bug等)时将系统重置回到正确状态。检查点可以根据配置周期性地生成快照,将这些快照存储到内存,rocksdb或者HDFS上,一旦出现错误了,那么就会有选择的从快照中进行恢复。

通过如下代码可以开启并配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//1.创建基础的流执行上下文环境,并配置并行度为4
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);

//2.开启检查点功能,并配置检查点的创建周期为5秒,设置模式为EXACTLY_ONCE,这也是默认的模式。
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);

//3.配置检查点的超时时间是20秒,检查点必须在20秒内完成,或者被丢弃。
env.getCheckpointConfig().setCheckpointTimeout(20 * 1000);

//4.配置最大的并发操作是1,同一时间只允许1个检查点执行。
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

//5.配置持久化端为HDFS
env.setStateBackend(new FsStateBackend("hdfs://hadoop001:9820/project_name/flink/checkpoint"));

//6. 其他配置
//......

StateBackend

  1. MemoryStateBackend:默认状态下,State状态会保存在TaskManager的堆内存中,Checkpoint检查点会保存到JobManager的内存中,内存是不稳定的,一旦宕机则会丢失,所以在生产环境下是不建议使用的。

  2. FsStateBackend:State数据保存在TaskManager内存中,执行Checkpoint时,会把State的快照数据保存到配置的文件系统中,可以用本地文件,或者HDFS。

  3. RocksDBStateBackend:在本地文件中维护状态,State状态直接写入本地的RocksDB中,同时需要配置一个远端的文件系统地址,一般是使用HDFS,在进行Checkpoint的时候,会把本地的数据直接复制到远程的HDFS中,故障切换的时候直接从远端的文件系统直接恢复数据到本地。RocksDB使用本地磁盘存储,便无内存的大小限制,又能存储数据到远程文件系统中,但是RocksDB的序列化反序列化存储会有一点性能影响,推荐此方式在生产中使用。

修改存储方式的方法有两种,一种是代码修改,代码如下:

1
2
//修改为hdfs的存储方式
env.setStateBackend(new FsStateBackend("hdfs://hadoop001:9820/project_name/flink/checkpoints"));

在使用RocksDBStateBackend的时候,是需要在pom中引入第三方依赖,然后再在代码中引用:

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.6.1</version>
</dependency>
1
env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop001:9820/project_name/flink/checkpoints", true));

另一种是修改flink-conf.yml,修改的参数如下:

1
2
3
4
5
6
7
8
# 文件系统
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints

# rocksdb
state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints

Restart Strategy

Flink支持的重启策略有三种,一种是固定间隔,一种是失败率,还有一种是无重启,没有启动Checkpoint,则使用无重启策略,如果启用了Checkpoint默认策略就是固定间隔策略,允许尝试重启的次数是Integer.MAX_VALUE次。

  1. 重启策略

通过flink-conf.yaml修改:

1
2
3
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

代码中配置:

1
2
3
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
Time.of(10, TimeUnit.SECONDS)
));
  1. 失败率
    通过flink-conf.yaml修改:
    1
    2
    3
    4
    restart-strategy: failure-rate
    restart-strategy.failure-rate.max-failures-per-interval: 3
    restart-strategy.failure-rate.failure-rate-interval: 5 min
    restart-strategy.failure-rate.delay: 10 s

代码中配置:

1
2
3
4
5
6
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 最大的失败次数
Time.of(5, TimeUnit.MINUTES), //衡量失败次数的时间段
Time.of(10, TimeUnit.SECONDS) //间隔
));
  1. 无重启
    通过flink-conf.yaml修改:
    1
    restart-strategy: none

代码配置:

1
2
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());

Savepoint 保存点

Savepoint是某个时间点程序状态的全局镜像,之后程序升级或者修改配置等情况下还能从保存的状态继续启动恢复。一般存在HDFS上,用户主动触发,是指向Checkpoint的指针,不会过期。

一般建议程序通过uid()方法给算子赋ID,这个配置的ID将确定算子执行的状态范围,只要ID未变就能从保存点恢复程序。

1
2
3
4
5
6
7
8
9
10
DataStreamSource<String> jsonStrStream = env.addSource(kafkaSource);
jsonStrStream.uid("kafka-source-id");

DataStream<JSONObject> jsonObjDS = jsonStrStream.map(new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String s) throws Exception {
return JSON.parseObject(s);
}
}).uid("to-json-obj-id");

如何开启Savepoint功能

修改flink-conf.yaml配置,这样在手动执行命令的时候可以不用指定保存路径

1
state.savepoints.dir: hdfs://hadoop001:8120/project_name/flink/savepoints

如何触发一个savepoint

1
2
3
4
5
6
7
8
# 手动触发保存
flink savepoint jobId [targetDir] [-yid yarnAppId]

# 调用cancel的时候触发savepoint
flink cancel -s [targetDir] jobId [-yid yarnAppId]

# 从指定的savepoint启动job
flink run -s savepointPath [runArgs]