概要
Flink的适用场景以及如何使用
什么是FLink
一句话总结,Flink就是一个分布式,高可用,高性能的流处理框架。
主要构造
-
checkpoint:基于chandy-lamport算法实现分布式计算任务的一致性语义;
-
state:flink中的状态机制,flink天生支持state,state可以认为程序的中间计算结果或者是历史计算结果;
-
time:flink中支持基于事件时间和处理时间进行计算,spark streaming只能按照process time进行处理;基于事件时间的计算我们可以解决数据迟到和乱序等问题。
-
window:flink提供了更多丰富的window,基于时间,基于数量,session window,同样支持滚动和滑动窗口的计算。
这个看起来没那么浅显易懂,用个很容易理解的栗子来说,Flink类似于一个多线程环境,每个的处理都是独立的,在每个处理里面他包含有自己的一系列机制,可以存储一些执行状态,定时任务,窗口计算等功能。
理论结合实际来打个比方,你现在有一个水池,连接了很多根水管,当水从水池流向水管的时候,水管里面的水就是相对独立的一个执行环境了,至于是这个水是被使用,还是被浪费,对于其他的水管没有任何影响。每个水管都有自己独立的开关,你可以控制他什么时候开什么时候关。但是需要注意的一点是,每根管子的构造是一模一样的,所以他们的流程是一样的,只是会因为状态的不一样而处理方式不同
何时应用,如何应用
当你的数据量很大,比如有些数据推送,每几秒都会给你推送,然后还分了很多个渠道给你推送,你对这些消息中的某些值比较关心,有些还需要做延迟处理的,使用flink那真是赶上巧了。flink的高性能处理,可以很好的满足你数据量大的问题,其中的state,timer等也会对你的每一条消息负责。
springboot使用flink还算比较简单
引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>1.13.2</version>
</dependency>
添加配置
@Component
public class SignalInit {
private static String topicNameSource;
private static String topicGroupSource;
private static String mqAddressSource;
@Value("${custom-rocketmq-source.attribute-id-topic-name}")
public void setHeatTopicSource(String heatTopicName) {
billetTopicNameSource = heatTopicName;
}
@Value("${rocketmq.name-server}")
public void setMqAddressSource(String mqAddressSourceAndPort) {
mqAddressSource = mqAddressSourceAndPort;
}
@Value("${custom-rocketmq-source.attribute-ids-topic-group}")
public void setHeatTopicGroupSource(String heatTopicGroup) {
heatTopicGroupSource = heatTopicGroup;
}
public static void init() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties consumerProps = getConsumerProps();
//获取配置信息
SimpleTupleDeserializationSchema schema = new SimpleTupleDeserializationSchema();
DataStream<Tuple2<String, String>> source =
env.addSource(new RocketMQSource<>(schema, consumerProps));
//获取mq发过来的数据转成IotEvent对象
DataStream<IotEvent> attributeEventStreamSource =
source.flatMap(new FilterCommonTuple2ToIotEventFlatMap());
// 分组处理
attributeEventStreamSource
.keyBy(IotEvent::getPushId)
.process(new DealSignalKeyedProcess());
try {
env.execute("SignalInit");
} catch (Exception e) {
e.printStackTrace();
}
}
private static Properties getConsumerProps() {
Properties consumerProps = new Properties();
consumerProps.setProperty(
RocketMQConstant.NAME_SERVER_ADDR,
mqAddressSource);
consumerProps.setProperty(RocketMQConstant.CONSUMER_GROUP, heatTopicGroupSource);
consumerProps.setProperty(RocketMQConstant.CONSUMER_TOPIC, billetTopicNameSource);
return consumerProps;
}
上面代码是Flink配合RokectMq对mq发送过来的数据做的一个处理,通过获取mq发送的消息转换成一个Event对象(这个Event对象自己根据mq发送过来的消息实体新建),最后生成一个Flink的流处理(new DealSignalKeyedProcess())开始执行,在DealSignalKeyedProcess会对你发过来的信号进行具体的处理。
如下是DealSignalKeyedProcess方法中大概需要实现的东西文章来源:https://www.toymoban.com/news/detail-804720.html
@Slf4j
public class DealSignalKeyedProcess extends KeyedProcessFunction<String, IotEvent, String> {
ApplicationContext context;
//这里就使用到了Flink的状态机制,里面存放是数据可以是基本数据类型,也可以是一个对象,其大致的使用方式为 f赋值: timeTsState.update(1.0),获取值timeTsState。value()
ValueState<Long> timeTsState;
ValueState<IotEvent> iotEventStatus;
public DealSignalKeyedProcess() {
}
/**
* open中将会对前面定义的属性进行一个简单的初始化
*/
@Override
public void open(Configuration parameters) throws Exception {
context = SpringUtil.getApplicationContext();
timeTsState = getRuntimeContext().getState(new ValueStateDescriptor<>("times-ts", Long.class));
iotEventStatus = getRuntimeContext().getState(new ValueStateDescriptor<>("iotEvent", IotEvent.clas
}
/**
* processElement为业务的主要执行方法,在这个方法中对接收到的Event来进行处理
* 其中可以声明一个定时器,在多少时间后执行,然后销毁定时器,下方将有一个简单的举例
*/
@Override
public void processElement(IotEvent value, Context ctx, Collector<String> out) {
iotEventStatus.update(value);
//....
if (timeTsState.value() == null) {
Long timestamp = sourceMsgTime + (timesUnitCode == 1 ? durationTime : durationTime * 60) * 1000L;
//创建持续时长后的定时器
ctx.timerService().registerProcessingTimeTimer(timestamp);
timeTsState.update(timestamp);
}
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, IotEvent, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
//这里为达到该定时时间时的具体业务实现...
}
}
小结
很多细节的东西网上都有讲,比如flink的架构体系,流程图等,很多东西其实就在于一个了解,知道在什么时候用什么方法能够提升效率,有一个大概的技术框架感觉就能很轻松的干活了。纯个人想法,不喜勿喷。文章来源地址https://www.toymoban.com/news/detail-804720.html
到了这里,关于Flink的简单使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!