热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

精通ApacheFlink读书笔记--5

5、复杂事件处理(CEP)5.1、什么是CEP?CEP用于分析低延迟、频繁产生的不同来源的事件流。CEP可以帮助在复杂的、不相关的事件流中找出有意义的模式和复杂的关系,以接近实时或准实时

5、复杂事件处理(CEP)

5.1、什么是CEP?

CEP用于分析低延迟、频繁产生的不同来源的事件流。CEP可以帮助在复杂的、不相关的事件流中找出有意义的模式和复杂的关系,以接近实时或准实时的获得通知并阻止一些行为。

CEP支持在流上进行模式匹配,根据模式的条件不同,分为连续的条件或不连续的条件;模式的条件允许有时间的限制,当在条件范围内没有达到满足的条件时,会导致模式匹配超时。

CEP的工作流图:

这里写图片描述

看起来很简单,但是它有很多不同的功能:

1、输入的流数据,尽快产生结果
2、在2event流上,基于时间进行聚合类的计算
3、提供实时/准实时的警告和通知
4、在多样的数据源中产生关联并分析模式
5、高吞吐、低延迟的处理

市场上有多种CEP的解决方案,例如Spark、Samza、Beam等,但他们都没有提供专门的library支持。但是Flink提供了专门的CEP library。

Flink为CEP提供了专门的Flink CEP library,它包含如下组件:

1Event Stream
2、pattern定义
3、pattern检测
4、生成Alert

这里写图片描述

首先,开发人员要在DataStream流上定义出模式条件,之后Flink CEP引擎进行模式检测,必要时生成告警。

为了使用Flink CEP,我们需要导入依赖:

<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-cep_2.10artifactId>
<version>1.2.0version>
dependency>

5.2.1、Event Streams

我们首先需要为Stream Event设计java pojo,但是注意,由于要对event对象进行对比,所以我们需要重写hashCode()方法和equals()方法。下面进行监控温度事件流。

创建抽象类MonitoringEvent,重写hashCode()和equals()方法;再创建POJO:TemperatureEvent,同样重写hashCode()和equals()方法:
MonitoringEvent:

package flink.cep;


public abstract class MonitoringEvent {
private String machineName;

public String getMachineName() {
return machineName;
}

public void setMachineName(String machineName) {
this.machineName = machineName;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((machineName == null) ? 0 : machineName.hashCode());

return result;
}

@Override
public boolean equals(Object obj) {
if(this == obj) return true;
if(obj == null) return false;
if(getClass() != obj.getClass()) return false;
MonitoringEvent other = (MonitoringEvent) obj;
if(machineName == null) {
if(other.machineName != null) {
return false;
}else if(!machineName.equals(other.machineName)) {
return false;
}
}
return true;
}

public MonitoringEvent(String machineName) {
super();
this.machineName = machineName;
}
}

TemperatureEvent:

package flink.cep;


public class TemperatureEvent extends MonitoringEvent{

public TemperatureEvent(String machineName) {
super(machineName);
}

private double temperature;

public double getTemperature() {
return temperature;
}

public void setTemperature(double temperature) {
this.temperature = temperature;
}

@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
long temp;
temp = Double.doubleToLongBits(temperature);
result = (int) (prime * result +(temp ^ (temp >>> 32)));

return result;
}

@Override
public boolean equals(Object obj) {
if(this == obj) return true;
if(!super.equals(obj)) return false;
if(getClass() != obj.getClass()) return false;

TemperatureEvent other = (TemperatureEvent) obj;
if(Double.doubleToLongBits(temperature) != Double.doubleToLongBits(other.temperature)) return false;
return true;
}

@Override
public String toString() {
return "TemperatureEvent [getTemperature()=" + getTemperature() + ", getMachineName=" + getTemperature() + "]";
}

public TemperatureEvent(String machineName, double temperature) {
super(machineName);
this.temperature = temperature;
}


}

创建env,创建source:

package temp;


import flink.cep.TemperatureEvent;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Test {

public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream inputEventStream = env.fromElements(
new TemperatureEvent("xyz",22.0),
new TemperatureEvent("xyz",20.1), new TemperatureEvent("xyz",21.1),
new TemperatureEvent("xyz",22.2), new TemperatureEvent("xyz",22.1),
new TemperatureEvent("xyz",22.3), new TemperatureEvent("xyz",22.1),
new TemperatureEvent("xyz",22.4), new TemperatureEvent("xyz",22.7),
new TemperatureEvent("xyz",27.0));
}
}

5.2.2、Pattern API

每个Pattern都应该包含几个步骤,或者叫做state。从一个state到另一个state,通常我们需要定义一些条件,例如下列的代码:

DataStream input = ...

Pattern pattern = Pattern.begin("start").where(evt -> evt.getId() == 42)
.next("middle").subtype(SubEvent.class).where(subEvt -> subEvt.getVolume() >= 10.0)
.followedBy("end").where(evt -> evt.getName().equals("end"));

PatternStream patternStream = CEP.pattern(input, pattern);

DataStream result = patternStream.select(pattern -> {
return createAlertFrom(pattern);
});

这里写图片描述

每个state都应该有一个标示:

Pattern start = Pattern.begin("start");

每个state都需要有一个唯一的名字,而且需要一个filter来过滤条件:

start.where(new FilterFunction() {
@Override
public boolean filter(Event value) {
return ... // some condition
}
});

我们也可以通过subtype来限制event的子类型:

start.subtype(SubEvent.class).where(new FilterFunction() {
@Override
public boolean filter(SubEvent value) {
return ... // some condition
}
});

事实上,你可以多次调用subtype和where方法;而且如果where条件是不相关的,你可以通过or来指定一个单独的filter函数:

pattern.where(new FilterFunction() {
@Override
public boolean filter(Event value) {
return ... // some condition
}
}).or(new FilterFunction() {
@Override
public boolean filter(Event value) {
return ... // or condition
}
});

之后,我们可以在此条件基础上,通过next或者followedBy方法切换到下一个state,next()的意思是说上一步符合条件的元素之后紧挨着的元素;而followedBy并不要求一定是挨着的元素。这两者分别称为严格近邻和非严格近邻。

Pattern<Event, ?> strictNext = start.next("middle");
Pattern nOnStrictNext= start.followedBy("middle");

最后,我们可以将所有的Pattern的条件限定在一定的时间范围内:

next.within(Time.seconds(10));

这个时间可以是processing time,也可以是Event time。

5.2.3、Pattern 检测

通过一个input DataStream以及刚刚我们定义的Pattern,我们可以创建一个PatternStream:

DataStream input = ...
Pattern pattern = ...

PatternStream patternStream = CEP.pattern(input, pattern);

一旦获得PatternStream,我们就可以通过select或flatSelect,从一个Map序列找到我们需要的告警信息。

5.2.3.1、select

select方法需要实现一个PatternSelectFunction,通过select方法来输出需要的警告。它接受一个Map对,包含string/event,其中key为state的名字,event则为真是的Event。

class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
@Override
public OUT select(Map pattern) {
IN startEvent = pattern.get("start");
IN endEvent = pattern.get("end");
return new OUT(startEvent, endEvent);
}
}

其返回值仅为1条记录。

5.2.3.2、flatSelect

通过实现PatternFlatSelectFunction,实现与select相似的功能。唯一的区别就是flatSelect方法可以返回多条记录。

class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
@Override
public void select(Map pattern, Collector collector) {
IN startEvent = pattern.get("start");
IN endEvent = pattern.get("end");

for (int i = 0; i collector.collect(new OUT(startEvent, endEvent));
}
}
}

5.2.4、超时事件的处理

通过within方法,我们的parttern规则限定在一定的窗口范围内。当有超过窗口时间后还到达的event,我们可以通过在select或flatSelect中,实现PatternTimeoutFunction/PatternFlatTimeoutFunction来处理这种情况。

PatternStream patternStream = CEP.pattern(input, pattern);

DataStream> result = patternStream.select(
new PatternTimeoutFunction() {...},
new PatternSelectFunction() {...}
);

DataStream> flatResult = patternStream.flatSelect(
new PatternFlatTimeoutFunction() {...},
new PatternFlatSelectFunction() {...}
);

5.3、例子

我们继续最开始时的温度检测的例子。

我们创建一个Alert类,表示在满足一定的pattern条件后,需要告警的内容:

package flink.cep;


public class Alert {

private String message;

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}

public Alert(String message) {
this.message = message;
}

@Override
public String toString() {
return "Alert [message=" + message + "]";
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((message == null) ? 0 : message.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if(this == obj) return true;
if(obj == null) return false;
if(getClass() != obj.getClass()) return false;

Alert other = (Alert) obj;

if(message == null) {
if(other.message != null) {
return false;
}else if(!message.equals(other.message)) {
return false;
}
}
return true;
}
}

最后,我们定义一个Pattern:当Event的温度超过26度时,立刻产生一个Alert信息,最终实现如下:

import flink.cep.Alert;
import flink.cep.TemperatureEvent;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.Map;

public class Test {

public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// DataStream : source
DataStream inputEventStream = env.fromElements(new TemperatureEvent("xyz",22.0),
new TemperatureEvent("xyz",20.1), new TemperatureEvent("xyz",21.1),
new TemperatureEvent("xyz",22.2), new TemperatureEvent("xyz",22.1),
new TemperatureEvent("xyz",22.3), new TemperatureEvent("xyz",22.1),
new TemperatureEvent("xyz",22.4), new TemperatureEvent("xyz",22.7),
new TemperatureEvent("xyz",27.0), new TemperatureEvent("xyz",30.0));

// 定义Pattern,检查10秒钟内温度是否高于26度
Pattern warningPattern = Pattern.begin("start")
.subtype(TemperatureEvent.class)
.where(new FilterFunction() {
private static final long serialVersiOnUID= 1L;

@Override
public boolean filter(TemperatureEvent value) throws Exception {
if(value.getTemperature() >= 26.0){
return true;
}
return false;
}
})
.within(Time.seconds(10));

//匹配pattern并select事件,符合条件的发生警告,即输出
DataStream patternStream = CEP.pattern(inputEventStream, warningPattern)
.select(new PatternSelectFunction() {
private static final long serialVersiOnUID= 1L;

@Override
public Alert select(Map event) throws Exception {
return new Alert("Temperature Rise Detected: " + event.get("start").getTemperature() + " on machine name: " + event.get("start").getMachineName());
}
});

patternStream.print();

env.execute("CEP on Temperature Sensor");
}
}

这个pattern非常简单,只要TemperatureEvent中有超过26度的记录,就发出一条警告。

5.4、总结

Flink CEP作用于DataStream上,定义pattern,即规则,当触发这些规则时,给出警告。

这里有一个更加复杂的例子供参考:

cep-monitoring。

这个例子是根据Flink CEP library来监控数据中心中每个机柜的温度。当在一定的时间内,如果有2个连续的Event中的温度超过设置的阈值时,就产生一条警告;一条警告也许还不是很坏的结果,但是如果我们在同一个机柜上连续看到2条这种警告,这种情况比较严重了。所以根据第一个警告流的输出,通过定义另一个Pattern,以上一步的输出作为第二个pattern的输入,来定义一个“严重”的问题。

下一节我们将简要介绍FlinkML library。


推荐阅读
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 模板引擎StringTemplate的使用方法和特点
    本文介绍了模板引擎StringTemplate的使用方法和特点,包括强制Model和View的分离、Lazy-Evaluation、Recursive enable等。同时,还介绍了StringTemplate语法中的属性和普通字符的使用方法,并提供了向模板填充属性的示例代码。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • 开发笔记:Java是如何读取和写入浏览器Cookies的
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了Java是如何读取和写入浏览器Cookies的相关的知识,希望对你有一定的参考价值。首先我 ... [详细]
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • Java程序设计第4周学习总结及注释应用的开发笔记
    本文由编程笔记#小编为大家整理,主要介绍了201521123087《Java程序设计》第4周学习总结相关的知识,包括注释的应用和使用类的注释与方法的注释进行注释的方法,并在Eclipse中查看。摘要内容大约为150字,提供了一定的参考价值。 ... [详细]
  • 本文介绍了Java中Hashtable的clear()方法,该方法用于清除和移除指定Hashtable中的所有键。通过示例程序演示了clear()方法的使用。 ... [详细]
author-avatar
141qws_330
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有