Bootstrap

复杂事件处理简介

复杂事件处理 (Complex Event Progressing,CEP) 是一种基于事件流的处理技术,它将系统数据看作不同类型的事件,通过分析事件间的关系,建立不同的事件关系序列库,利用过滤、关联、聚合、模式匹配等技术,最终由简单事件产生高级事件或商业流程。早在20世纪80年代,SQL的出现通过面向问题的方式取代了面向过程的查询数据方式,率先在数据库中广泛应用起来。90年代,Sybase率先提出触发器(Trigger)的理念,把数据的变更与事件联系起来,但触发器能够处理的数据量和复杂度有限,而且也没有时间序列的概念。2000年左右有厂商开始在这个方向上做一些基于事件和数据流的处理,并且借鉴SQL,希望通过面向问题的方式进行处理,现在已经形成了一个特殊的领域,也就是复杂事件处理(CEP)。

目前CEP系统有很多,功能也各不相同,常见的CEP系统有Esper、Shiddi、Flink、Oracle Event Processing等等,更详细的信息可参见wikipedia()。本文将从什么是CEP、CEP与流式计算、CEP分布式实现等几个方面简单介绍CEP。

一、什么是复杂事件处理

为了直观理解CEP,我们先来看Shiddi所提供的一个官方例子。

public class SimpleFilterSample {
    public static void main(String[] args) throws InterruptedException {
        // Creating Siddhi Manager
        SiddhiManager siddhiManager = new SiddhiManager();

        String siddhiApp = "" +
                "define stream cseEventStream (symbol string, price float, volume long); " +
                "" +
                "@info(name = 'query1') " +
                "from cseEventStream[volume < 150] " +
                "select symbol,price " +
                "insert into outputStream ;";

        // Generating runtime
        SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);

        // Adding callback to retrieve output events from query
        siddhiAppRuntime.addCallback("query1", new QueryCallback() {
            @Override
            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
                // EventPrinter.print(timeStamp, inEvents, removeEvents);
                System.out.print(inEvents[0].getData(0) + " ");
            }
        });

        // Retrieving InputHandler to push events into Siddhi
        InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");

        // Starting event processing
        siddhiAppRuntime.start();

        // Sending events to Siddhi
        inputHandler.send(new Object[]{"Welcome", 700f, 100L});
        inputHandler.send(new Object[]{"WSO2", 60.5f, 200L});
        inputHandler.send(new Object[]{"to", 50f, 30L});
        inputHandler.send(new Object[]{"IBM", 76.6f, 400L});
        inputHandler.send(new Object[]{"siddhi!", 45.6f, 50L});
        Thread.sleep(500);

        // Shutting down the runtime
        siddhiAppRuntime.shutdown();

        // Shutting down Siddhi
        siddhiManager.shutdown();
    }
}

基于上面的例子,很容易对CEP有个直观的理解,首先定义一个事件流cseEventStream,通过程序不断向这个流中插入事件,同时通过对流中事件的实时处理,过滤调volume < 150的事件,并且插入到outputStream中,通过关联的回调函数对outputStream流中的事件在做进一步处理,整个处理过程完全使用类SQL面向问题方式,不需要针对性的开发过程式处理程序。

尽管CEP有多种不同实现,但总体上讲CEP的特点是通过类SQL、DSL等方式完成对业务逻辑处理的描述,而非开发过程式处理程序。下面再来看个例子:如果一个房间温度在10分钟之内增长超过5度,发送告警。

from every( e1=TempStream ) -> e2=TempStream[ e1.roomNo == roomNo and (e1.temp + 5) <= temp ] within 10 min 
select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp insert into AlertStream;

对于不熟悉CEP的人来讲,这段代码初看起来有点复杂,我们可以简单的这么理解一下,首先有一个叫做TempStream的流存放温度事件,对于每一个进入TempStream流的事件都定义一个10分钟的事件窗口(within 10 min),我们把第一个事件定义为e1,窗口期内e1之后的其他事件我们定义为e2,[ e1.roomNo == roomNo and (e1.temp + 5) <= temp ]表示e1,e2是同一房间产生的事件并且温度增长超过5度,当满足这个条件时,将e1,e2作为一条记录插入到AlertStream流中。

有别与一般的流式处理框架,every( e1=TempStream ) -> e2=TempStream[ e1.roomNo == roomNo and (e1.temp + 5) <= temp ] 这种独特的表现形式在CEP中被称为模式(Pattern)。

二、CEP vs 流式计算

伴随着Storm、Spark、Flink等流式计算框架的出现,CEP的热度也逐渐上升起来,尽管CEP的出现要早于Storm、Spark、Flink很久。CEP与流式计算有很多相似之处,但两者最大的区别就是CEP是面向问题的,而Storm等流式计算框架是面向过程的,针对具体的问题需要开发大量应用程序。可以认为Storm等流式计算框架可以做为CEP的底层实现,事实上像WSO2这样的产品也就是这样实现的,也就是说当用户定义CEP Query后,WSO2进行Query解析,分解成若干算子后交由Storm集群去处理。

三、分布式CEP

前文中,我们展示了一个单实例的CEP程序并且提及流计算框架可以做为CEP的基础,通常像Storm这类流计算框架是分布式、无状态的,而CEP通常是有状态的,这里我们将进一步讨论CEP遇到分布式、无状态的流计算会遇到哪些问题以及如何处理:

前文中我们提到WSO2基于Storm,接下来以“多种类型的厨房设备将自身设备状态发送给CEP,检测到异常后发送告警通知。”的例子来介绍WSO2如何与Storm结合。

Step1,首先定义一个通过Http形式接受事件的流(DevicePowerStream),DevicePowerStream的并行度是3,可以理解为DevicePowerStream对应一个Strom Bolt,Bolt的并行度是3。

@source(type = ‘http’, receiver.url=‘ ‘, topic = ‘device-power’, @dist(parallel =‘3’) @map(type = ‘json’))
define stream DevicePowerStream (type string, deviceID string, power int);

Step2, 当DevicePowerStream收到数据后,业务上只关注type=‘monitored’的设备数据,所以DevicePowerStream流用于过滤type=‘monitored’的数据。

@info(name = ‘monitored-filter’) @dist(execGroup=‘group1’, parallel =‘3’)
from DevicePowerStream[type == ‘monitored’]
select deviceID, power
insert current events into MonitoredDevicesPowerStream;

Step3, 根据deviceID对MonitoredDevicesPowerStream中的数据做分组,同时定义一个2min的时间窗口,并计算窗口期内power的平均值,写入到AvgPowerStream中,对于AvgPowerStream流的处理则采用了前面提到的CEP Pattern技术,10分钟内出现两次事件值的增幅大于5则认为是异常事件。

@info(name = ‘power-increase-pattern’) @dist(execGroup=‘group2’, parallel =‘3’)
partition with (deviceID of MonitoredDevicesPowerStream) 
begin
    @info(name = ‘avg-calculator’)
    from MonitoredDevicesPowerStream#window.time(2 min) select deviceID, avg(power) as avgPower
    insert current events into #AvgPowerStream;

    @info(name = ‘power-increase-detector’)
    from every e1 = #AvgPowerStream -> e2 = #AvgPowerStream[(e1.avgPower + 5) <= avgPower] within 10 min
    select e1.deviceID as deviceID, e1.avgPower as initialPower, e2.avgPower as finalPower
    insert current events into RisingPowerStream; 
end;

Step4,而后对异常事件在进行后续过滤处理、发送告警,这里不在详细解释。

从上面的图中可以看到,流之间的数据传递使用了Kafka而非Storm Bolt之间数据的简单传递。这里应用Kafka做为持久化存储主要解决以下问题:

完整示例:

@source(type = 'http', receiver.url=' ', topic = 'device-power', @dist(parallel ='3') @map(type = 'json'))
define stream DevicePowerStream (type string, deviceID string, power int);


@sink(type = 'email', to = '{{autorityContactEmail}}', username = 'john', address = 'john@gmail.com', password = 'test', subject = 'High power consumption of {{deviceID}}',
@map(type = 'text', @payload('Device ID: {{deviceID}} of room : {{roomID}} power is consuming {{finalPower}}kW/h. ')))
define stream AlertStream (deviceID string, roomID string, initialPower double, finalPower double, autorityContactEmail string);


@Store(type="rdbms", jdbc.url="jdbc:mysql://localhost:3306/sp", username="root", password="root" , jdbc.driver.name="com.mysql.jdbc.Driver",field.length="symbol:100") 
define table DeviceIdInfoTable (deviceID string, roomID string, autorityContactEmail string);


@info(name = 'monitored-filter') @dist(execGroup='group1', parallel ='3')
from DevicePowerStream[type == 'monitored']
select deviceID, power
insert current events into MonitoredDevicesPowerStream;


@info(name = 'power-increase-pattern') @dist(execGroup='group2', parallel ='3')
partition with (deviceID of MonitoredDevicesPowerStream) 
begin
    @info(name = 'avg-calculator')
    from MonitoredDevicesPowerStream#window.time(2 min) select deviceID, avg(power) as avgPower
    insert current events into #AvgPowerStream;
    
    @info(name = 'power-increase-detector')
    from every e1 = #AvgPowerStream -> e2 = #AvgPowerStream[(e1.avgPower + 5) <= avgPower] within 10 min
    select e1.deviceID as deviceID, e1.avgPower as initialPower, e2.avgPower as finalPower
    insert current events into RisingPowerStream; 
end;


@info(name = 'power-range-filter') @dist(execGroup='group3', parallel ='1')
from RisingPowerStream[finalPower > 100]
select deviceID, initialPower, finalPower
insert current events into DevicesWithinRangeStream;


@info(name = 'enrich-alert')
@dist(execGroup='group3' ,parallel ='1')
from DevicesWithinRangeStream as s join DeviceIdInfoTable as t
on s.deviceID == t.deviceID
select s.deviceID as deviceID, t.roomID as roomID, s.initialPower as initialPower, s.finalPower as finalPower, t.autorityContactEmail as autorityContactEmail insert current events into AlertStream;

四、总结

值得一提的是,随着流计算的兴起,出现了很多基于流计算开发的类SQL系统,简化流计算开发,比如Kafka KSQL。相比于CEP,这类系统普遍实现了Window算子,但却缺少Sequence、Pattern等高级算子,或者不久之后也会实现。

CEP在IOT、量化交易、风控等多个领域有着广泛的应用,相信以后应用会越来越广,拭目以待吧。