Esper学习之十:EPL语法(六)
- 格式:docx
- 大小:36.13 KB
- 文档页数:15
一,概述关系型数据用来查询相对静态的数据,如果执行一些复杂的查询,要降低查询的频度。
传统关系型数据库普遍将数据存储在硬盘(也有内存数据库),它的检索性能受限于硬盘访问性能。
受制于关系型数据库的设计,如果频繁查询数据来实现实时统计则需要在较短时间内构建多次查询语句(SQL),每次检索都耗时较长,关系型数据库会成为系统瓶颈。
总而言之,传统关系型数据库并不适合每秒成百上千次的查询统计。
Esper引擎可以使用类似于SQL的EPL语句来构建处理模型,处理每秒几万到几十万的实时数据的查询统计。
它的工作有点像倒过来的关系型数据库,它不需要像数据库那样存储数据,而是先构建查询语句,引擎依据这些处理模型实时的输出符合的结果。
而关系型数据要查询语句提交后才会输出结果。
Esper除核心jar包之外还有诸如io,jdbc,jmx等jar。
本文只讲解核心的前几章基础知识。
二,例子先看一个Epser处理实时数据的例子:收集某网站的实时用户访问日志(accesslog),数据字段如下:ip(访客ip)、time(访问时间)、url(页面地址)、httpcode(状态码)、agent(浏览器头信息)、sizeinbytes(数据大小)、等等。
场景1,分析1小时(周期)产生的状态码数量:select httpcode,count(*) as hz from accesslog.win:time_batch(1 hour) group by httpcode otder by hz desc;场景2,分析1小时(周期)访客对url的访问频率:select ip,url,count(*) as hz from accesslog.win:time_batch(1 hour) group by ip,url order by hz desc;场景3,找到可能危险的请求(状态码403 404),分析1小时(周期)访客对url的访问频率:select ip,url,count(*) as hz from accesslog(httpcode in(403,404)).win:time_batch(1 hour) group by ip,url order by hz desc;注:三个例子使用了时间批量窗口,它会收集此时间间隔数据一起统计,周期内只输出一次结果(这样更像是关系型数据库的输出形式),以便于理解。
教程简介Esper是一个事件流处理(ESP)和事件关联引擎(CEP的,复杂事件处理)。
Esper的目标是针对实时事件驱动架构(EDA)。
当Esper监测到事件流中又符合条件的时间发生时,即可触发Plain Old Java Objects(POJO)编写的自定义操作。
当数百万数量级的事件同时发生时,我们不可能使用普通的关系型数据库来存储和查询,Esper正是专为这样的大批量关联事件而设计的。
Esper提供一个定制的事件处理语言(EPL),允许的条件表达丰富的事件,相关性,可能跨越时间窗,从而减少开发工作需要设置一个系统,可以对复杂的情况作出反应。
Esper是一个轻量级的Java编写的内核是完全纳入任何Java进程,JEE应用服务器或基于Java 的企业服务总线嵌入的。
它使应用程序的过程中收到的消息或事件的大量快速发展。
介绍事件流和复杂事件信息是至关重要的作出明智的决定。
这是真实的在现实生活中,而且在计算,并在几个领域,如金融,欺诈检测,商业智能或战场的运作,特别是关键。
从信息流中的消息或事件的形式不同的来源,如给在特定的时间内股票价格对国家的暗示。
这就是说,看着那些离散事件是毫无意义的时间最多。
交易者需要看一段时间内的股票走势可能与其他信息在恰当的时间,最好的交易相结合。
虽然离散事件时看了一个又一个可能是毫无意义的,事件流 - 这是一个无限集合的事件 - 被认为在一个滑动窗口,进一步密切相关,是很有意义的,并用最小的延迟反应对他们是至关重要的有效行动和竞争优势。
简介Esper关系数据库或如JMS消息为基础的系统使之真正难以对付的时空数据和实时查询。
事实上,数据库查询,返回需要明确的有意义的数据,不适合推动,因为它变化的数据。
JMS的系统是无状态的,需要开发人员的时间和实施自己的聚合逻辑。
相比之下,Esper引擎提供了一个更高的抽象与才智,可以被看作是一个数据库天翻地覆式:不是存储数据和运行对存储的数据查询,Esper允许应用程序存储数据的查询和运行通过。
上篇说到了Esper的Context,要是不了解的同学请参看《Esper学习之四:Context》,看过的同学如果还是不理解的话可以给我评论,我将会尽可能的解答。
之前有些同学问我Context和Group by有什么区别,其实如果只是很简单的用Context,那么确实没太大区别,无非是在Context下select可以不包含group by修饰的属性。
但是Group by明显没有Context强大,很多复杂的分组Group by是没法做到的。
不过在能达到同样效果的情况下,我还是建议使用Group by,毕竟Context的名字是不能重复的,而且在高并发的情况下Context会短时间锁住。
至于原因,这已经是Esper的高级篇了,这里暂且不说。
今天开始讲解Esper的重中之重——EPL。
EPL可以说是Esper 的核心,要是不会将简单的业务需求转化为EPL,更别说复杂的EPL了,最后就等着被客户骂吧。
接下来的很多篇都会围绕EPL讲解,大概会有十来篇吧,毕竟英文文档都有140页了,草草两篇根本就说不完。
废话不多说,先简单介绍下什么是EPL,即使第一篇有说过,但是这里有必要细说一下。
EPL,全称Event Processing Language,是一种类似SQL的语言,包含了SELECT, FROM, WHERE, GROUP BY, HAVING 和 ORDER BY子句,同时用事件流代替了table作为数据源,并且能像SQL那样join,filtering和aggregation。
所以如果各位有SQL基础的话,简单的EPL很容易掌握。
除了select,EPL也有insert into,update,delete,不过含义和SQL并不是很接近。
另外还有pattern和output子句,这两个是SQL所没有的。
EPL还定义了一个叫view的东西,类似SQL的table,来决定哪些数据是可用的,Esper提供了十多个view,并且保证这些view可以被重复使用。
epoll学习笔记epoll有两种模式,Edge Triggered(简称ET) 和Level Triggered(简称LT).在采用这两种模式时要注意的是,如果采用ET模式,那么仅当状态发生变化时才会通知,而采用LT模式类似于原来的select/poll操作,只要还有没有处理的事件就会一直通知.以代码来说明问题:首先给出server的代码,需要说明的是每次accept的连接,加入可读集的时候采用的都是ET模式,而且接收缓冲区是5字节的,也就是每次只接收5字节的数据:#include <iostream>#include <sys/socket.h>#include <sys/epoll.h>#include <netinet/in.h>#include <arpa/inet.h>#include <fcntl.h>#include <unistd.h>#include <stdio.h>#include <errno.h>using namespace std;#define MAXLINE 5#define OPEN_MAX 100#define LISTENQ 20#define SERV_PORT 5000#define INFTIM 1000void setnonblocking(int sock){int opts;opts=fcntl(sock,F_GETFL);if(opts<0){perror("fcntl(sock,GETFL)");exit(1);}opts = opts|O_NONBLOCK;if(fcntl(sock,F_SETFL,opts)<0){perror("fcntl(sock,SETFL,opts)");exit(1);}}int main(){int i, maxi, listenfd, connfd, sockfd,epfd,nfds;ssize_t n;char line[MAXLINE];socklen_t clilen;//声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件 struct epoll_event ev,events[20];//生成用于处理accept的epoll专用的文件描述符epfd=epoll_create(256);struct sockaddr_in clientaddr;struct sockaddr_in serveraddr;listenfd = socket(AF_INET, SOCK_STREAM, 0);//把socket设置为非阻塞方式//setnonblocking(listenfd);//设置与要处理的事件相关的文件描述符ev.data.fd=listenfd;//设置要处理的事件类型ev.events=EPOLLIN|EPOLLET;//ev.events=EPOLLIN;//注册epoll事件epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);bzero(&serveraddr, sizeof(serveraddr));serveraddr.sin_family = AF_INET;char *local_addr="127.0.0.1";inet_aton(local_addr,&(serveraddr.sin_addr));//htons(SERV_PORT); serveraddr.sin_port=htons(SERV_PORT);bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr));listen(listenfd, LISTENQ);maxi = 0;for ( ; ; ) {//等待epoll事件的发生nfds=epoll_wait(epfd,events,20,500);//处理所发生的所有事件for(i=0;i<nfds;++i){if(events[i].data.fd==listenfd){connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen);if(connfd<0){perror("connfd<0");exit(1);}//setnonblocking(connfd);char *str = inet_ntoa(clientaddr.sin_addr);cout << "accapt a connection from " << str << endl;//设置用于读操作的文件描述符ev.data.fd=connfd;//设置用于注测的读操作事件ev.events=EPOLLIN|EPOLLET;//ev.events=EPOLLIN;//注册evepoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); }else if(events[i].events&EPOLLIN){cout << "EPOLLIN" << endl;if ( (sockfd = events[i].data.fd) < 0)continue;if ( (n = read(sockfd, line, MAXLINE)) < 0) {if (errno == ECONNRESET) {close(sockfd);events[i].data.fd = -1;} elsestd::cout<<"readline error"<<std::endl;} else if (n == 0) {close(sockfd);events[i].data.fd = -1;}line[n] = '\0';cout << "read " << line << endl;//设置用于写操作的文件描述符ev.data.fd=sockfd;//设置用于注测的写操作事件ev.events=EPOLLOUT|EPOLLET;//修改sockfd上要处理的事件为EPOLLOUT//epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);}else if(events[i].events&EPOLLOUT){sockfd = events[i].data.fd;write(sockfd, line, n);//设置用于读操作的文件描述符ev.data.fd=sockfd;//设置用于注测的读操作事件ev.events=EPOLLIN|EPOLLET;//修改sockfd上要处理的事件为EPOLINepoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);}}}return 0;}下面给出测试所用的Perl写的client端,在client中发送10字节的数据,同时让client在发送完数据之后进入死循环, 也就是在发送完之后连接的状态不发生改变--既不再发送数据, 也不关闭连接,这样才能观察出server的状态:#!/usr/bin/perluse IO::Socket;my $host = "127.0.0.1";my $port = 5000;my $socket = IO::Socket::INET->new("$host:$port") or die "create socket error $@";my $msg_out = "1234567890";print $socket $msg_out;print "now send over, go to sleep\n";while (1){sleep(1);}运行server和client发现,server仅仅读取了5字节的数据,而client其实发送了10字节的数据,也就是说,server仅当第一次监听到了EPOLLIN事件,由于没有读取完数据,而且采用的是ET模式,状态在此之后不发生变化,因此server再也接收不到EPOLLIN事件了.(友情提示:上面的这个测试客户端,当你关闭它的时候会再次出发IO可读事件给server,此时server就会去读取剩下的5字节数据了,但是这一事件与前面描述的ET性质并不矛盾.)如果我们把client改为这样:#!/usr/bin/perluse IO::Socket;my $host = "127.0.0.1";my $port = 5000;my $socket = IO::Socket::INET->new("$host:$port") or die "create socket error $@";my $msg_out = "1234567890";print $socket $msg_out;print "now send over, go to sleep\n";sleep(5);print "5 second gone send another line\n";print $socket $msg_out;while (1){sleep(1);}可以发现,在server接收完5字节的数据之后一直监听不到client的事件,而当client休眠5秒之后重新发送数据,server再次监听到了变化,只不过因为只是读取了5个字节,仍然有10个字节的数据(client第二次发送的数据)没有接收完.如果上面的实验中,对accept的socket都采用的是LT模式,那么只要还有数据留在buffer中,server就会继续得到通知,读者可以自行改动代码进行实验.基于这两个实验,可以得出这样的结论:ET模式仅当状态发生变化的时候才获得通知,这里所谓的状态的变化并不包括缓冲区中还有未处理的数据,也就是说,如果要采用ET模式,需要一直read/write直到出错为止,很多人反映为什么采用ET模式只接收了一部分数据就再也得不到通知了,大多因为这样;而LT模式是只要有数据没有处理就会一直通知下去的.补充说明一下这里一直强调的"状态变化"是什么:1)对于监听可读事件时,如果是socket是监听socket,那么当有新的主动连接到来为状态发生变化;对一般的socket而言,协议栈中相应的缓冲区有新的数据为状态发生变化.但是,如果在一个时间同时接收了N个连接(N>1),但是监听socket只accept了一个连接,那么其它未accept的连接将不会在ET模式下给监听socket 发出通知,此时状态不发生变化;对于一般的socket,就如例子中而言,如果对应的缓冲区本身已经有了N字节的数据,而只取出了小于N字节的数据,那么残存的数据不会造成状态发生变化.2)对于监听可写事件时,同理可推,不再详述.而不论是监听可读还是可写,对方关闭socket连接都将造成状态发生变化,比如在例子中,如果强行中断client脚本,也就是主动中断了socket连接,那么都将造成server端发生状态的变化,从而server得到通知,将已经在本方缓冲区中的数据读出.把前面的描述可以总结如下:仅当对方的动作(发出数据,关闭连接等)造成的事件才能导致状态发生变化,而本方协议栈中已经处理的事件(包括接收了对方的数据,接收了对方的主动连接请求)并不是造成状态发生变化的必要条件,状态变化一定是对方造成的.所以在ET模式下的,必须一直处理到出错或者完全处理完毕,才能进行下一个动作,否则可能会发生错误.另外,从这个例子中,也可以阐述一些基本的网络编程概念.首先,连接的两端中,一端发送成功并不代表着对方上层应用程序接收成功, 就拿上面的client测试程序来说,10字节的数据已经发送成功,但是上层的server 并没有调用read读取数据,因此发送成功仅仅说明了数据被对方的协议栈接收存放在了相应的buffer中,而上层的应用程序是否接收了这部分数据不得而知;同样的,读取数据时也只代表着本方协议栈的对应buffer 中有数据可读,而此时时候在对端是否在发送数据也不得而知.。
中秋三天,说闲也不闲,调调工作的代码,倒还解决不少问题。
不过也是因为最近工作忙的缘故,Esper被我冷落不少日子了,趁着今天最后一天,赶紧写一篇出来。
从上一篇开始说EPL的语法,主要是关于注解的。
今天来说说比较常用的语法,Select Clause和From Clause。
这个两个可以说是写EPL必备,要想得到事件流的处理结果,基本上就靠他们俩了(Pattern除外)。
今天的内容比较简单,还请各位同学牢记,以免以后应用的时候花时间看文档或者我的文章。
Select Clause1.查询事件流的所有属性及特定属性EPL的select和SQL的select很相近,SQL用*表示查询表的所有字段,而EPL 用*表示查询事件流的所有属性值。
SQL查询某个字段名,直接在select后跟字段名就ok,EPL也是将要查询的属性名放在select之后。
若查多个属性值,则用逗号分割。
和SQL一样,EPL查询属性也可以设置别名。
示例如下:[plain]view plaincopy1.// EPL:查询完整的User对象2.select * from User3.// 获取User对象er u = newEvent.getUnderlying();5.6.// EPL:查询User的name和id,id别名为i7.select name, id as i from User8.// 获取name和id9.String name = (String)newEvent.get("name");10.int id = (Integer)newEvent.get("i");这里要注意,如果查询的是一个完整对象,需要调用getUnderlying()方法,而get方法是针对确定的属性名或者别名。
另外*是不能设置别名的。
2.表达式除了查询完整对象和特定属性,EPL还支持属性值的计算,以计算后的值作为结果返回,并且也能设置别名。
ESPER构建大数据量实时分析引擎ESPER是一个实时数据分析引擎,并不是一个实时数据分析系统。
引擎和系统之间有很大的差别。
它无法作为一套完整独立的实时分析系统来使用它有以下几个缺点:(1)数据安全:ESPER引擎内部都是内存操作,一旦重启会丢失。
这里就涉及到一个日志failover的问题,那这个目前尚不在ESPER的处理范围内,需要自己来处理。
(2)高并发数据:ESPER引擎并没有做分布式框架,大量数据的分布式处理需要自己想办法来解决。
ESPER也有Enterprise版本、HA版本,据官方介绍这两个版本比较安全可靠,但由于这两个版本不开源、且收费,也未去做相应的研究在开源的esper基础上也可以自己搭建一套完整的实时数据分析系统。
先说说我的需求吧,目前有个实时数据流平台,能推送JSON格式的数据过来,每种格式的数据有各自不同的tag。
实时数据可以推送到MQ服务器,希望通过配置不同的EPL语句得到分析的结果,并将结果也输出都MQ服务器中。
由于数据量比较大,所有数据往单台服务器上推肯定是不现实的,因此需要对数据进行切分,实现分析引擎的水平扩展。
下面是我初步设计的架构图对各个模块简单说明一下:三、Scheduler:调度模块key就是select 语句后面的那些字段该模块编码不复杂,重点在于EPL语句的合理性六、Summary模块这个是对分布式运算的结果进行合并的模块,类似于Hadoop中的Reducer 这不是个现成的模块,需要自己来实现,主要功能是把各个ESPER引擎分布计算的日志进行聚合运算,实现sum,count,加减乘除等响应计算方法就可以了。
最好把输出结果发送到MQ队列中。
这里有两个个问题(1)因为是分布式运算,多个时间段的数据在到达Summary模块的时间先后顺序不一致,因此需要再ESPER推送到Summary的时候加入一个ID,只将相同ID的统计消息进行聚合运算。
(2)虽然说都是近5秒的数据,但是这个时间如果按照机器时间来算的话就很难办了,因为你5秒内收集到的数据可能是apache10s内的日志。
EPL命令简单解释最近使⽤条码打印机需要⽤到EPL语⾔,整理⼀下,使⽤EPL不需要打印驱动,打印控制到点阵,⾮常精确,最初使⽤Msbarcode9.ocx,显⽰没有问题,但打印的是空⽩,考虑到打印的精确调整使⽤EPL,⽹上没有PB的例⼦,我把PowerBuilder的代码整理如下:string ls_print,ls_fileinteger li_FileNumls_file = g_curr_path + '\' + 'ean13.prn'if fileexists(ls_file) thenfiledelete(ls_file)end ifli_FileNum = FileOpen(ls_file, LineMode!, Write!, LockWrite!, Append!)ls_print = char(13) + char(10)ls_print = ls_print + 'N' + char(13) + char(10)ls_print = ls_print + 'Q200,24' + char(13) + char(10)ls_print = ls_print + 'D10' + char(13) + char(10)ls_print = ls_print + 'ZT' + char(13) + char(10)ls_print = ls_print + 'S2' + char(13) + char(10)ls_print = ls_print + 'B44,15,0,E30,3,6,142,B,"'+string(dw_3.object.upc[1])+'"' + char(13) + char(10)ls_print = ls_print + 'P1' + char(13)FileWrite(li_FileNum, ls_print)FileClose(li_FileNum)Run("prt.bat", Minimized!)解释:<回车>此⾏为激活命令N<回车>清除缓冲区Q240,24<回车> 设置标签长度和间距, 长度=240dots=30mm,间距=24dots=3mmD10<回车>设置打印深度为10ZB<回车> 打印⽅向为旋转180度S2<回车>打印速度为2,范围2-4B240,80,0,E30,3,6,126,B,"123456789012" <回车>例范围为1-9;参数N为普通打印,如为R则为反转打印;“1234567890123”为打印内容打印条码,240,80为X,Y坐标点;0为条码⽅向正向;E30为EAN13码;3,6为条码的细条宽度,粗条宽度;126为条码⾼度;B表⽰打印条码下⽅字符;“123456789012”为打印内容,注意EAN13码最后⼀位⾃动⽣成P1<回车>打印,1为打印数量EPL2命令说明EPL2严格区分⼤⼩写A语法 Ap1,p2,p3,p4,p5,p6,p7,“DATA”参数 p1=以象素点为单位的⽔平起始位置p2=以象素点为单位的垂直起始位置p3=旋转Value Description0 No rotation1 90 degrees2 180 degrees3 270 degrees4 No rotation5 90 degrees6 180 degrees7 270 degrees4-7是针对亚洲字体的设置p4=字体选择Value Description203 dpi 300 dpi1 20.3 cpi, 6 pts, 25 cpi, 4 pts,(8 x 12 dots) (12 x 20 dots)2 16.9 cpi, 7 pts, 18.75 cpi, 6 pts,(10 x 16 dots) (16 x 28 dots)3 14.5 cpi, 10 pts, 15 cpi, 8 pts,(12 x 20 dots) (20 x 36 dots)4 12.7 cpi, 12 pts, 12.5 cpi, 10 pts,(14 x 24 dots) (24 x 44 dots)5 5.6 cpi, 24 pts, 6.25 cpi, 21 pts,(32 x 48 dots) (48 x 80 dots)6 Numeric Only Numeric Only(14 x 19 dots) (14 x 19 dots)7 Numeric Only Numeric Only(14 x 19 dots) (14 x 19 dots)亚洲打印机Simplified Chinese, Japanese, Korean 8 203 dpi fonts : 24 x 24 dots300 dpi Double-byte fonts : 36 x 36 dots 300 dpi Single-byte fonts : 24 x 36 dots Traditional Chinese, Japanese9 300 dpi Double-byte fonts: 36 x 36 dots 300 dpi Single-byte fonts : 24 x 36 dots Korean - Reservedp5=⽔平膨胀,值:1-6&8p6=垂直膨胀,值:1-9p7=N代表普通,R表⽰反转图象"DATA"=数据B⽤于打印标准条码语法:Bp1,p2,p3,p4,p5,p6,p7,p8,“DATA”参数 p1=以象素点为单位的⽔平起始位置p2=以象素点为单位的垂直起始位置p3=旋转Value Description0 No rotation1 90 degrees2 180 degrees3 270 degreesp4=条码选择,参见条码表p5=窄条的象数宽度,参见条码表p6=宽条的象数宽度,参见条码表p7=条码的象素⾼度p8=打印可读代码,值 B=是,N=否"DATA"=数据条码表BarCode De scrip tion P4Value P5Value P6Value Code 39 std. orextended31-10YCode 39 with checkdigit3C1-10NCode 9391-10NCode 128 UCC SerialShipping Container01-10N CodeCode 128 auto A, B, Cmodes11-10NCode 128 mode A1A1-10NCode 128 mode B1B1-10NCode 128 mode C1C1-10NCode 128 withDeutsche Post checkdigit41D2-10NCodabar K1-10YEAN8E802-4NEAN8 2 digit add-on E822-4NEAN8 5 digit add-on E852-4NEAN13E302-4NEAN13 2 digit add-on E322-4NEAN13 5 digit add-on E352-4NGerman Post Code2G3-4NInterleaved 2 of 521-10YInterleaved 2 of 5 withmod 10 check digit2C1-10YInterleaved 2 of 5 withhuman readable check2D1-10Y digitPostnet 5, 9, 11 & 13digit1P?NPlanet 11 & 13 digit1PL?NJapanese Postnet3J??UCC/EAN 12821E1-10NUPC A UA02-4NUPC A 2 digit add-on UA22-4NUPC A 5 digit add-on UA52-4NUPC E UE02-4NUPC E UE02-4N UPC E 2 digit add-on UE22-4N UPC E 5 digit add-on UE52-4N UPC Interleaved 2 of 52U1-10Y Plessey (MSI-1) withmod. 10 check digit L?? MSI-3 with mod. 10check digit M??。
国庆七天,本想出去玩玩,可是哪里都是人,所以还是家里蹲吧。
上篇说到了Select Clause和From Clause,今天这篇就说说Aggregation,Group by,Having 和Output Clause。
先预告一下,由于例子比较多,所以篇幅会有些长,需要各位耐心观看哦。
1.Aggregation和SQL一样,EPL也有Aggregation,即聚合函数。
语法如下:[plain]view plaincopy1.aggregate_function([all|distinct] expression)aggregate_function就是聚合函数的名字,比如avg,sum等。
expression通常是事件流的某个属性,也可以是不同事件流的多个属性,或者是属性和常量、函数之间的运算。
举例如下。
[plain]view plaincopy1.// 查询最新5秒的Apple的平均价格2.select avg(price) as aPrice from Apple.win:time(5 sec)3.4.// 查询最新10个Apple的价格总和的两倍5.select sum(price*2) as sPrice from Apple.win:length(10)6.7.// 查询最新10个Apple的价格,并用函数计算后再算平均值8.select avg(Compute.getResult(price)) from Apple.win:length(10)函数只能是静态方法,普通方法不可用。
即使是事件流里包含的静态方法,也必须用“类名.方法名”的方式进行引用。
可以使用distinct关键字对expression加以约束,表示去掉expression产生的重复的值。
默认情况下为all关键字,即所有的expression值都参与聚合运算。
例如:[plain]view plaincopy1.// 查询最新5秒的Apple的平均价格2.select avg(distinct price) as aPrice from Apple.win:time(5 sec)3.4.// 假如:5秒内进入了三个Apple事件,price分别为2,1,2。
2014是新的一年,正好也是本人的本命年。
既然是本命年,看来今年也是本人兴旺之年了。
开了个小玩笑,同时也祝各位同行今年少调bug多涨工资,这才是最实际的。
年前的最后一篇说的是子查询和join,基本上epl的大部分简单语法都说完了。
之前有朋友问我epl怎么和数据库交互,正好今天这篇就是来专门解释这个问题。
但是要提醒各位,本篇只是说明了在epl中如何与数据库交互,并且只能算是简单的交互。
而高级的用法会在esperio里有详细的指导(esperio的文档可在esper的官网找到)。
在esper的文档中,epl访问数据库的配置放在了比较靠后的位置,不过为了方便各位学习,这里会先说明和数据库交互的相关配置,然后再说epl怎么访问数据库。
配置文件在官方esper包的etc文件夹下,大家可以参考着学习。
1.连接数据库a.JNDI获取连接配置如下:[html]view plaincopy1.<database-reference name="mydb1">2.<datasource-connection context-lookup-name="java:comp/env/jdbc/mydb">3.<env-property name="java.naming.factory.initial" value ="com.myclass.CtxFactory"/>4.<env-property name="java.naming.provider.url" value="iiop://localhost:1050"/ >5.</datasource-connection>6.</database-reference>database-reference的name是要连接的数据库名字,其余的配置可参考JNDI 的文档使用方法:[java]view plaincopy1.if (envProperties.size() > 0) {2.initialContext = new InitialContext(envProperties);3.} else {4.initialContext = new InitialContext();5.}6.DataSource dataSource = (DataSource) initialContext.lookup(lookupName);7.Connection connection = dataSource.getConnection();更多内容可参考JNDI的文档b.从连接池获取连接配置如下:(以dbcp为例)[html]view plaincopy1.<database-reference name="mydb3">2.<!-- For a complete list of properties see ApacheDBCP. -->3.<datasourcefactory-connection class-name="mons.dbcp.BasicDataSourceFactory">4.<env-property name="username" value ="myusername"/>5.<env-property name="password" value ="mypassword"/>6.<env-property name="driverClassName" value ="com.mysql.jdbc.Driver"/>7.<env-property name="url" value ="jdbc:mysql://localhost/test"/>8.<env-property name="initialSize" value ="2"/>9.<env-property name="validationQuery" value ="select1 from dual"/>10. </datasourcefactory-connection>11.<connection-lifecycle value="pooled"/></database-reference>相同的配置可以使用esper的api达到同样的效果。
代码如下:[java]view plaincopy1.Properties props = new Properties();2.props.put("username", "myusername");3.props.put("password", "mypassword");4.props.put("driverClassName", "com.mysql.jdbc.Driver");5.props.put("url", "jdbc:mysql://localhost/test");6.props.put("initialSize", 2);7.props.put("validationQuery", "select 1 from dual");8.9.ConfigurationDBRef configDB = new ConfigurationDBRef();10.// BasicDataSourceFactory is an Apache DBCP import11.configDB.setDataSourceFactory(props, BasicDataSourceFactory.class.getName());12.configDB.setConnectionLifecycleEnum(ConfigurationDBRef.ConnectionLifecycleEnum.POOLED);13.Configuration configuration = new Configuration();14.configuration.addDatabaseReference("mydb3", configDB);同样,也可以自己实现数据源。
示例如下:[java]view plaincopy1.configDB.setDataSourceFactory(props, MyOwnDataSourceFactory.class.getName());2....3.class MyOwnDataSourceFactory {4.public static DataSource createDataSource(Properties properties) {5.return new MyDataSourceImpl(properties);6.}7.}c.JDBC获取连接前提是要将对应的jdbc驱动假如classpath[html]view plaincopy1.<database-reference name="mydb2">2.<drivermanager-connection class-name="com.mysql.jdbc.Driver" url="jdbc:mysql://localhost:3306/mydb2" user="myuser" pas sword="mypassword">3.<connection-arg name="user" value ="myuser"/>4.<connection-arg name="password" value ="mypassword"/>5.<connection-arg name="somearg" value ="someargvalue"/>6.</drivermanager-connection>7.</database-reference>注意:drivermanager-connection中的user和password属性必须填写,即使增加了connection-arg参数也不行。
所以实际上connection-arg的user和password是不需要写的。
这点我觉得esper做的不够人性化。
d.其他关于数据库连接的配置下面是一些和数据库交互的配置,更多配置可参考Javadoc[html]view plaincopy1.<database-reference name="mydb2">2.... configure data source or driver manager settings...3.<connection-settings auto-commit="true" catalog="mycatalog" read-only="true" transaction-isolation="1" />4.</database-reference>下面是关于连接的生命周期的配置[html]view plaincopy1.<database-reference name="mydb2">2.... configure data source or driver manager settings...3.<connection-lifecycle value="pooled"/><!-- retain -->4.</database-reference>如果参数值为pooled,当配置了连接池,则会将每次获取的连接还给连接池。