Esper学习之八:EPL语法(四)
- 格式:docx
- 大小:28.37 KB
- 文档页数:14
500 引言复杂事件处理(Complex Event Process)是一组已定义的工具和技术,用于分析和控制驱动现代分布式信息系统的一系列相互关联的复杂事件[1]。
决策支持系统有很多种形式,其中一种就包括复杂事件处理,从简单事件流中推导出复杂决策是CEP提供的基本能力。
简单事件可能触发系统中的状态转换,通过与预定义的事件模式进行对比,可以了解复杂事件的关系。
复杂事件处理引擎将外部收集到的事件流作为输入,并对其进行连续和及时的处理,以关注更高层次的事件中发生了什么。
例如:网络入侵检测系统实时分析网络流量,以确定可能的攻击;环境检测应用处理来自传感器网络的原始数据,以确定污染的程度[2]。
复杂事件处理技术可以作为一种解决方法广泛应用在决策支持,大数据分析等领域。
1 Esper介绍Esper是一种用于复杂事件处理(CEP)和流分析的引擎[3],具有可扩展性强,内存效率高,内存计算,低延迟,高吞吐,实时流处理的特点,用于在线和离线数据的事件分析。
Esper提供了一种事件处理语言(EPL),它是一种用于处理基于时间的高频事件数据的声明性语言,实现和扩展sq l标准,并支持针对事件和时间的丰富表达式。
同时,Esper可以在单机和分布式环境中运行,不依赖外部环境。
Esper能够应用在业务过程管理和自动化、金融、网络和应用程序监控以及传感器网络等领域。
1.1 EPL事件是已经发生的事实,新的事件只能够添加到事件流中,而不能从事件流中移除。
在Esper中,流(Stream)是CEP的主要构建模块。
Esper引擎中的计算执行是以事件流作为基础,并使用标准表达式描述关注的事件模式,在Esper中,标准表达式称为事件处理语言(Event Processing Language)。
EPL语句允许指定关注的事件模式,并将其部署到Esper复杂事件处理引擎上,Esper将流定义为按时间排序的事件序列。
首先,将简单事件流输入到Esper引擎中;随后,Esper引擎执行一系列转换,以确定是否满足感兴趣的事件模式(例如:用于检测违反某条规则)。
expect 语法Expect语法是一种常用的自动化测试工具,用于控制和验证命令行程序的行为。
本文将详细介绍Expect语法的使用方法和注意事项。
一、Expect语法简介Expect语法是基于Tcl脚本语言的一种扩展,专门用于编写自动化测试脚本。
它通过模拟用户与命令行程序的交互过程,可以实现自动化测试和验证命令行程序的正确性。
二、Expect语法的基本结构Expect脚本由一系列的Expect命令组成,每个Expect命令用于匹配和处理命令行程序的输出。
基本的Expect语法如下所示:```expect "pattern" {command1command2...} expect "pattern" {command3command4...}其中,"pattern"用于匹配命令行程序的输出,可以使用正则表达式进行模式匹配。
当匹配成功时,Expect会执行对应的命令。
三、Expect语法的常用命令1. spawn命令:用于启动一个新的命令行程序,并与之建立交互。
```spawn command```2. expect命令:用于匹配命令行程序的输出,并执行对应的命令。
```expect "pattern" {command1command2...}```3. send命令:用于向命令行程序发送命令。
```send "command\r"```4. interact命令:用于交互式地与命令行程序进行交互。
interact```5. timeout命令:用于设置超时时间,当超过指定时间后仍未匹配到输出时,执行对应的命令。
```timeout time {command1command2...}```四、Expect语法的使用示例下面以一个简单的例子来介绍Expect语法的使用。
假设我们要测试一个命令行程序,该程序会提示用户输入用户名和密码,并验证用户的身份。
一,概述关系型数据用来查询相对静态的数据,如果执行一些复杂的查询,要降低查询的频度。
传统关系型数据库普遍将数据存储在硬盘(也有内存数据库),它的检索性能受限于硬盘访问性能。
受制于关系型数据库的设计,如果频繁查询数据来实现实时统计则需要在较短时间内构建多次查询语句(SQL),每次检索都耗时较长,关系型数据库会成为系统瓶颈。
总而言之,传统关系型数据库并不适合每秒成百上千次的查询统计。
Esper引擎可以使用类似于SQL的EPL语句来构建处理模型,处理每秒几万到几十万的实时数据的查询统计。
它的工作有点像倒过来的关系型数据库,它不需要像数据库那样存储数据,而是先构建查询语句,引擎依据这些处理模型实时的输出符合的结果。
而关系型数据要查询语句提交后才会输出结果。
Esper除核心jar包之外还有诸如io,jdbc,jmx等jar。
本文只讲解核心的前几章基础知识。
二,例子先看一个Epser处理实时数据的例子:收集某网站的实时用户访问日志(accesslog),数据字段如下:ip(访客ip)、time(访问时间)、url(页面地址)、httpcode(状态码)、agent(浏览器头信息)、sizeinbytes(数据大小)、等等。
场景1,分析1小时(周期)产生的状态码数量:select httpcode,count(*) as hz from accesslog.win:time_batch(1 hour) group by httpcode otder by hz desc;场景2,分析1小时(周期)访客对url的访问频率:select ip,url,count(*) as hz from accesslog.win:time_batch(1 hour) group by ip,url order by hz desc;场景3,找到可能危险的请求(状态码403 404),分析1小时(周期)访客对url的访问频率:select ip,url,count(*) as hz from accesslog(httpcode in(403,404)).win:time_batch(1 hour) group by ip,url order by hz desc;注:三个例子使用了时间批量窗口,它会收集此时间间隔数据一起统计,周期内只输出一次结果(这样更像是关系型数据库的输出形式),以便于理解。
教程简介Esper是一个事件流处理(ESP)和事件关联引擎(CEP的,复杂事件处理)。
Esper的目标是针对实时事件驱动架构(EDA)。
当Esper监测到事件流中又符合条件的时间发生时,即可触发Plain Old Java Objects(POJO)编写的自定义操作。
当数百万数量级的事件同时发生时,我们不可能使用普通的关系型数据库来存储和查询,Esper正是专为这样的大批量关联事件而设计的。
Esper提供一个定制的事件处理语言(EPL),允许的条件表达丰富的事件,相关性,可能跨越时间窗,从而减少开发工作需要设置一个系统,可以对复杂的情况作出反应。
Esper是一个轻量级的Java编写的内核是完全纳入任何Java进程,JEE应用服务器或基于Java 的企业服务总线嵌入的。
它使应用程序的过程中收到的消息或事件的大量快速发展。
介绍事件流和复杂事件信息是至关重要的作出明智的决定。
这是真实的在现实生活中,而且在计算,并在几个领域,如金融,欺诈检测,商业智能或战场的运作,特别是关键。
从信息流中的消息或事件的形式不同的来源,如给在特定的时间内股票价格对国家的暗示。
这就是说,看着那些离散事件是毫无意义的时间最多。
交易者需要看一段时间内的股票走势可能与其他信息在恰当的时间,最好的交易相结合。
虽然离散事件时看了一个又一个可能是毫无意义的,事件流 - 这是一个无限集合的事件 - 被认为在一个滑动窗口,进一步密切相关,是很有意义的,并用最小的延迟反应对他们是至关重要的有效行动和竞争优势。
简介Esper关系数据库或如JMS消息为基础的系统使之真正难以对付的时空数据和实时查询。
事实上,数据库查询,返回需要明确的有意义的数据,不适合推动,因为它变化的数据。
JMS的系统是无状态的,需要开发人员的时间和实施自己的聚合逻辑。
相比之下,Esper引擎提供了一个更高的抽象与才智,可以被看作是一个数据库天翻地覆式:不是存储数据和运行对存储的数据查询,Esper允许应用程序存储数据的查询和运行通过。
上篇说到了Esper的Context,要是不了解的同学请参看《Esper学习之四:Context》,看过的同学如果还是不理解的话可以给我评论,我将会尽可能的解答。
之前有些同学问我Context和Group by有什么区别,其实如果只是很简单的用Context,那么确实没太大区别,无非是在Context下select可以不包含group by修饰的属性。
但是Group by明显没有Context强大,很多复杂的分组Group by是没法做到的。
不过在能达到同样效果的情况下,我还是建议使用Group by,毕竟Context的名字是不能重复的,而且在高并发的情况下Context会短时间锁住。
至于原因,这已经是Esper的高级篇了,这里暂且不说。
今天开始讲解Esper的重中之重——EPL。
EPL可以说是Esper 的核心,要是不会将简单的业务需求转化为EPL,更别说复杂的EPL了,最后就等着被客户骂吧。
接下来的很多篇都会围绕EPL讲解,大概会有十来篇吧,毕竟英文文档都有140页了,草草两篇根本就说不完。
废话不多说,先简单介绍下什么是EPL,即使第一篇有说过,但是这里有必要细说一下。
EPL,全称Event Processing Language,是一种类似SQL的语言,包含了SELECT, FROM, WHERE, GROUP BY, HAVING 和 ORDER BY子句,同时用事件流代替了table作为数据源,并且能像SQL那样join,filtering和aggregation。
所以如果各位有SQL基础的话,简单的EPL很容易掌握。
除了select,EPL也有insert into,update,delete,不过含义和SQL并不是很接近。
另外还有pattern和output子句,这两个是SQL所没有的。
EPL还定义了一个叫view的东西,类似SQL的table,来决定哪些数据是可用的,Esper提供了十多个view,并且保证这些view可以被重复使用。
epoll学习笔记epoll有两种模式,Edge Triggered(简称ET) 和Level Triggered(简称LT).在采用这两种模式时要注意的是,如果采用ET模式,那么仅当状态发生变化时才会通知,而采用LT模式类似于原来的select/poll操作,只要还有没有处理的事件就会一直通知.以代码来说明问题:首先给出server的代码,需要说明的是每次accept的连接,加入可读集的时候采用的都是ET模式,而且接收缓冲区是5字节的,也就是每次只接收5字节的数据:#include <iostream>#include <sys/socket.h>#include <sys/epoll.h>#include <netinet/in.h>#include <arpa/inet.h>#include <fcntl.h>#include <unistd.h>#include <stdio.h>#include <errno.h>using namespace std;#define MAXLINE 5#define OPEN_MAX 100#define LISTENQ 20#define SERV_PORT 5000#define INFTIM 1000void setnonblocking(int sock){int opts;opts=fcntl(sock,F_GETFL);if(opts<0){perror("fcntl(sock,GETFL)");exit(1);}opts = opts|O_NONBLOCK;if(fcntl(sock,F_SETFL,opts)<0){perror("fcntl(sock,SETFL,opts)");exit(1);}}int main(){int i, maxi, listenfd, connfd, sockfd,epfd,nfds;ssize_t n;char line[MAXLINE];socklen_t clilen;//声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件 struct epoll_event ev,events[20];//生成用于处理accept的epoll专用的文件描述符epfd=epoll_create(256);struct sockaddr_in clientaddr;struct sockaddr_in serveraddr;listenfd = socket(AF_INET, SOCK_STREAM, 0);//把socket设置为非阻塞方式//setnonblocking(listenfd);//设置与要处理的事件相关的文件描述符ev.data.fd=listenfd;//设置要处理的事件类型ev.events=EPOLLIN|EPOLLET;//ev.events=EPOLLIN;//注册epoll事件epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);bzero(&serveraddr, sizeof(serveraddr));serveraddr.sin_family = AF_INET;char *local_addr="127.0.0.1";inet_aton(local_addr,&(serveraddr.sin_addr));//htons(SERV_PORT); serveraddr.sin_port=htons(SERV_PORT);bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr));listen(listenfd, LISTENQ);maxi = 0;for ( ; ; ) {//等待epoll事件的发生nfds=epoll_wait(epfd,events,20,500);//处理所发生的所有事件for(i=0;i<nfds;++i){if(events[i].data.fd==listenfd){connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen);if(connfd<0){perror("connfd<0");exit(1);}//setnonblocking(connfd);char *str = inet_ntoa(clientaddr.sin_addr);cout << "accapt a connection from " << str << endl;//设置用于读操作的文件描述符ev.data.fd=connfd;//设置用于注测的读操作事件ev.events=EPOLLIN|EPOLLET;//ev.events=EPOLLIN;//注册evepoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); }else if(events[i].events&EPOLLIN){cout << "EPOLLIN" << endl;if ( (sockfd = events[i].data.fd) < 0)continue;if ( (n = read(sockfd, line, MAXLINE)) < 0) {if (errno == ECONNRESET) {close(sockfd);events[i].data.fd = -1;} elsestd::cout<<"readline error"<<std::endl;} else if (n == 0) {close(sockfd);events[i].data.fd = -1;}line[n] = '\0';cout << "read " << line << endl;//设置用于写操作的文件描述符ev.data.fd=sockfd;//设置用于注测的写操作事件ev.events=EPOLLOUT|EPOLLET;//修改sockfd上要处理的事件为EPOLLOUT//epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);}else if(events[i].events&EPOLLOUT){sockfd = events[i].data.fd;write(sockfd, line, n);//设置用于读操作的文件描述符ev.data.fd=sockfd;//设置用于注测的读操作事件ev.events=EPOLLIN|EPOLLET;//修改sockfd上要处理的事件为EPOLINepoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);}}}return 0;}下面给出测试所用的Perl写的client端,在client中发送10字节的数据,同时让client在发送完数据之后进入死循环, 也就是在发送完之后连接的状态不发生改变--既不再发送数据, 也不关闭连接,这样才能观察出server的状态:#!/usr/bin/perluse IO::Socket;my $host = "127.0.0.1";my $port = 5000;my $socket = IO::Socket::INET->new("$host:$port") or die "create socket error $@";my $msg_out = "1234567890";print $socket $msg_out;print "now send over, go to sleep\n";while (1){sleep(1);}运行server和client发现,server仅仅读取了5字节的数据,而client其实发送了10字节的数据,也就是说,server仅当第一次监听到了EPOLLIN事件,由于没有读取完数据,而且采用的是ET模式,状态在此之后不发生变化,因此server再也接收不到EPOLLIN事件了.(友情提示:上面的这个测试客户端,当你关闭它的时候会再次出发IO可读事件给server,此时server就会去读取剩下的5字节数据了,但是这一事件与前面描述的ET性质并不矛盾.)如果我们把client改为这样:#!/usr/bin/perluse IO::Socket;my $host = "127.0.0.1";my $port = 5000;my $socket = IO::Socket::INET->new("$host:$port") or die "create socket error $@";my $msg_out = "1234567890";print $socket $msg_out;print "now send over, go to sleep\n";sleep(5);print "5 second gone send another line\n";print $socket $msg_out;while (1){sleep(1);}可以发现,在server接收完5字节的数据之后一直监听不到client的事件,而当client休眠5秒之后重新发送数据,server再次监听到了变化,只不过因为只是读取了5个字节,仍然有10个字节的数据(client第二次发送的数据)没有接收完.如果上面的实验中,对accept的socket都采用的是LT模式,那么只要还有数据留在buffer中,server就会继续得到通知,读者可以自行改动代码进行实验.基于这两个实验,可以得出这样的结论:ET模式仅当状态发生变化的时候才获得通知,这里所谓的状态的变化并不包括缓冲区中还有未处理的数据,也就是说,如果要采用ET模式,需要一直read/write直到出错为止,很多人反映为什么采用ET模式只接收了一部分数据就再也得不到通知了,大多因为这样;而LT模式是只要有数据没有处理就会一直通知下去的.补充说明一下这里一直强调的"状态变化"是什么:1)对于监听可读事件时,如果是socket是监听socket,那么当有新的主动连接到来为状态发生变化;对一般的socket而言,协议栈中相应的缓冲区有新的数据为状态发生变化.但是,如果在一个时间同时接收了N个连接(N>1),但是监听socket只accept了一个连接,那么其它未accept的连接将不会在ET模式下给监听socket 发出通知,此时状态不发生变化;对于一般的socket,就如例子中而言,如果对应的缓冲区本身已经有了N字节的数据,而只取出了小于N字节的数据,那么残存的数据不会造成状态发生变化.2)对于监听可写事件时,同理可推,不再详述.而不论是监听可读还是可写,对方关闭socket连接都将造成状态发生变化,比如在例子中,如果强行中断client脚本,也就是主动中断了socket连接,那么都将造成server端发生状态的变化,从而server得到通知,将已经在本方缓冲区中的数据读出.把前面的描述可以总结如下:仅当对方的动作(发出数据,关闭连接等)造成的事件才能导致状态发生变化,而本方协议栈中已经处理的事件(包括接收了对方的数据,接收了对方的主动连接请求)并不是造成状态发生变化的必要条件,状态变化一定是对方造成的.所以在ET模式下的,必须一直处理到出错或者完全处理完毕,才能进行下一个动作,否则可能会发生错误.另外,从这个例子中,也可以阐述一些基本的网络编程概念.首先,连接的两端中,一端发送成功并不代表着对方上层应用程序接收成功, 就拿上面的client测试程序来说,10字节的数据已经发送成功,但是上层的server 并没有调用read读取数据,因此发送成功仅仅说明了数据被对方的协议栈接收存放在了相应的buffer中,而上层的应用程序是否接收了这部分数据不得而知;同样的,读取数据时也只代表着本方协议栈的对应buffer 中有数据可读,而此时时候在对端是否在发送数据也不得而知.。
中秋三天,说闲也不闲,调调工作的代码,倒还解决不少问题。
不过也是因为最近工作忙的缘故,Esper被我冷落不少日子了,趁着今天最后一天,赶紧写一篇出来。
从上一篇开始说EPL的语法,主要是关于注解的。
今天来说说比较常用的语法,Select Clause和From Clause。
这个两个可以说是写EPL必备,要想得到事件流的处理结果,基本上就靠他们俩了(Pattern除外)。
今天的内容比较简单,还请各位同学牢记,以免以后应用的时候花时间看文档或者我的文章。
Select Clause1.查询事件流的所有属性及特定属性EPL的select和SQL的select很相近,SQL用*表示查询表的所有字段,而EPL 用*表示查询事件流的所有属性值。
SQL查询某个字段名,直接在select后跟字段名就ok,EPL也是将要查询的属性名放在select之后。
若查多个属性值,则用逗号分割。
和SQL一样,EPL查询属性也可以设置别名。
示例如下:[plain]view plaincopy1.// EPL:查询完整的User对象2.select * from User3.// 获取User对象er u = newEvent.getUnderlying();5.6.// EPL:查询User的name和id,id别名为i7.select name, id as i from User8.// 获取name和id9.String name = (String)newEvent.get("name");10.int id = (Integer)newEvent.get("i");这里要注意,如果查询的是一个完整对象,需要调用getUnderlying()方法,而get方法是针对确定的属性名或者别名。
另外*是不能设置别名的。
2.表达式除了查询完整对象和特定属性,EPL还支持属性值的计算,以计算后的值作为结果返回,并且也能设置别名。
国庆假期之后的工作周,居然苦逼的只有一个休息日,博客没写成不说,打球还把脚给扭了。
不仅如此,这周开始疯狂加班了,所以今天这篇拖了又拖。
关于EPL,已经写了三篇了,预估计了一下,除了今天这篇,后面还有5篇左右。
大家可别嫌多,官方的文档对EPL的讲解有将近140页,我已经尽量将废话都干掉了,再配合我附上的例子,看我的10篇文章比那140页英文文档肯定舒服多了吧。
也请各位原谅我一周一篇的速度,毕竟我还要学习,生活,工作,一个都不能少。
今天讲解的内容包括三块:Order by,Limit,Insert into。
大家会SQL的应该很熟悉这三个东西,前两个比较简单,Insert into会有一些差别,篇幅也相对多些。
1.Order byEPL的Order by和SQL的几乎一模一样,作用都是对输出结果进行排序,但是也有一些需要注意的地方。
语法如下:[plain]view plaincopy1.order by expression [asc | desc] [, expression [asc |desc]] [, ...]expreession表示要排序的字段,asc表示升序排列(从小到大),desc表示降序排列(从大到小)。
举个例子:[plain]view plaincopy1.// 每进入5个事件输出一次,并且先按照name升序排列,再按照age降序排列。
2.select * from User output every 5 events order by name, age desc使用方法很简单,除了和SQL相似的特点外,还有他自己需要注意的几点:a. 如果不特别说明是升序还是降序,默认情况下按照升序排列。
b. 如果order by的子句中出现了聚合函数,那么该聚合函数必须出现在select 的子句中。
c. 出现在select中的expression或者在select中定义的expression,在order by中也有效。
d. 如果order by所在的句子没有join或者没有group by,则排序结果幂等,否则为非幂等。
2. LimitLimit在EPL中和在SQL中也基本一样,不过SQL中是用具体的数字来表示限制范围,而EPL可以是常量或者变量来表示限制范围。
语法如下:[plain]view plaincopy1.limit row_count [offset offset_count]row_count表示输出多少行,可以是一个整型常量,也可以是一个整型变量,以方便运行时修改。
offset_count表示在当前结果集中跳过n行然后再输出,同样也可以是一个整型变量。
如果不使用此参数,则表示跳过0行,即从第一行输出。
举例如下:[plain]view plaincopy1.// 输出结果集的第3行到第10行2.select uri, count(*) from WebEvent group by uri outputsnapshot every 1 minute order by count(*) desc limit8 offset 2除了以上的语法,limit还有一种简化的写法,实际上是参照SQL的标准。
[plain]view plaincopy1.limit offset_count[, row_count]两个参数的含义和上面的一样,并且我们将上面的例子改写一下:[plain]view plaincopy1.// 输出结果集的第3行到第10行2.select uri, count(*) from WebEvent group by uri outputsnapshot every 1 minute order by count(*) desc limit2, 8如果这个两个参数是负数会怎么样呢?row_count为负数,则无限制输出,若为0,则不输出。
当row_count是变量表示并且变量为null,则无限制输出。
offset _count是不允许负数的,如果是变量表示,并且变量值为null或者负数,则EPL会把他假设为0。
3. Insert into3.1 简单用法EPL的Insert into和SQL的有比较大的区别。
SQL是往一张表里插入数据,而EPL是把一个事件流的计算结果放入另一个事件流,然后可以对这个事件流进行别的计算。
所以Insert into的一个好处就是可以将是事件流的计算结果不断级联,对于那种需要将上一个业务的结果数据放到下一个业务处理的场景再适合不过了。
除此之外,Insert into还有合并多个计算结果的作用。
到这里相信大家已经对他越来越好奇了,不急,咱们先来看看语法:[plain]view plaincopy1.insert [istream | irstream | rstream] into event_stream_name [ (property_name [, property_name] ) ]event_stream_name定义了事件流的名称,在执行完insert的定义之后,我们可以使用select对这个事件流进行别的计算。
istream | irstream | rstream表示该事件流允许另一个事件的输入/输入和输出/输出数据能够进入(解释好像很绕。
一会儿看例子就能明白了)property_name表示该事件流里包含的属性名称,多个属性名之间用逗号分割,并且用小括号括起来。
上面的说明可能不是很好理解,咱们先看个例子:[plain]view plaincopy1.// 将新进入的Asus事件传递到Computer,且Asus的id,size和Computer的cid,csize对应2.insert into Computer(cid,csize) select id,size from Asus3.4.// 第二种写法5.insert into Computer select id as cid, size as csizeAsus从例子中可以看到,insert into需要配合select进行使用,以表明前一个事件流有哪些计算结果将进入insert into定义的事件流。
并且在select中的字段要和insert里的事件流的属性要对应(这里指的对应是数据类型对应,而且属性数量也必须一样)。
如果说insert定义的事件流名称在之前已经定义过(insert into中定义的除外),重名是不允许的。
我个人推荐第二种写法,通过as设置的别名即为insert定义的事件流的属性,这样可以避免属性的个数不一致的错误。
刚才说了istream | irstream | rstream的用法,可能有点表述不清楚,这里看一个完整的例子。
[java]view plaincopy1./**2.*3.* @author luonanqin4.*5.*/6.class Asus7.{8.private int id;9.private int size;10.11. public int getId()12. {13. return id;14. }15.16. public void setId(int id)17. {18. this.id = id;19. }20.21.22. public int getSize()23. {24. return size;25. }26.27. public void setSize(int size)28. {29. this.size = size;30. }31.32. public String toString()33. {34. return "id: " + id + ", size: " + size;35. }36.}37.38.39.class InsertRstreamListener implements UpdateListener40.{41. public void update(EventBean[] newEvents, EventBean[] oldEvents)42. {43. if (newEvents != null)44. {45. for (int i = 0; i < newEvents.length; i++)46. {47. Object id = newEvents[i].get("cid");48. System.out.println("Insert Asus: cid: " + id);49. }50. }51. if (oldEvents != null)52. {53. for (int i = 0; i < oldEvents.length; i++)54. {55. Object id = oldEvents[i].get("cid");56. System.out.println("Remove Asus: cid: " + id);57. }58. }59. System.out.println();60. }61.}62.63.public class InsertRstreamTest {64.65. public static void main(String[] args) throws InterruptedException {66. EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();67.68. EPAdministrator admin = epService.getEPAdministrator();69.70. String asus = Asus.class.getName();71. String insertEPL = "insert rstream into Computer(cid,csize) select id,size from " + asus + ".win :length(1)";72. String insertSelectEPL = "select cid fromComputer.win:length_batch(2)";73.74. EPStatement state = admin.createEPL(insertEPL);75. EPStatement state1 = admin.createEPL(insertSelectEPL);76. state1.addListener(new InsertRstreamListener());77.78. EPRuntime runtime = epService.getEPRuntime();79.80. Asus apple1 = new Asus();81. apple1.setId(1);82. apple1.setSize(1);83. System.out.println("Send Asus: " + apple1);84. runtime.sendEvent(apple1);85.86. Asus apple2 = new Asus();87. apple2.setId(2);88. apple2.setSize(1);89. System.out.println("Send Asus: " + apple2);90. runtime.sendEvent(apple2);91.92. Asus apple3 = new Asus();93. apple3.setId(3);94. apple3.setSize(3);95. System.out.println("Send Asus: " + apple3);96. runtime.sendEvent(apple3);97.98. Asus apple4 = new Asus();99. apple4.setId(4);100.apple4.setSize(4);101.System.out.println("Send Asus: " + app le4);102.runtime.sendEvent(apple4);103.104.Asus apple5 = new Asus();105.apple5.setId(5);106.apple5.setSize(3);107.System.out.println("Send Asus: " + app le5);108.runtime.sendEvent(apple5);109.110.Asus apple6 = new Asus();111.apple6.setId(6);112.apple6.setSize(4);113.System.out.println("Send Asus: " + app le6);114.runtime.sendEvent(apple6);115.}116.}执行结果:[plain]view plaincopy1.Send Asus: id: 1, size: 12.Send Asus: id: 2, size: 13.Send Asus: id: 3, size: 34.Insert Asus: cid: 15.Insert Asus: cid: 26.7.Send Asus: id: 4, size: 48.Send Asus: id: 5, size: 39.Insert Asus: cid: 310.Insert Asus: cid: 411.12.Send Asus: id: 6, size: 4这个例子中,insertEPL表示当Asus事件从length为1的view 中移除时,把移除的事件放入Computer。