Storm这是一个分布式的、容错的实时计算系统,它被托管在GitHub上,遵循Eclipse Public License 1.0。Storm是由BackType开发的实时处理系统,BackType 现在已在Twitter麾下,基本是用Clojure写的。
Storm为分布式实时计算提供了一组通用原语,可被用于“流处理”之中,实时处理消息并更新数据库。这是管理队列及工作者集群的另一种方式。Storm也可被用于“连续计算”(continuous computation),对数据流做连续查询,在计算时就将结果以流的形式输出给用户。它还可被用于“分布式RPC”,以并行的方式运行昂贵的运算。Storm的主工程师Nathan Marz表示:
Storm可以方便地在一个计算机集群中编写与扩展复杂的实时计算,Storm之于实时处理,就好比Hadoop之于批处理。Storm保证每个消息都会得到处理,而且它很快——在一个小集群中,每秒可以处理数以百万计的消息。更棒的是你可以使用任意编程语言来做开发。
Storm的主要特点如下:
1.简单的编程模型。类似于MapReduce降低了并行批处理复杂性,Storm降低了进
行实时处理的复杂性。
2.可以使用各种编程语言。你可以在Storm之上使用各种编程语言。默认支持Clojure、
Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm
通信协议即可。
3.容错性。Storm会管理工作进程和节点的故障。
4.水平扩展。计算是在多个线程、进程和服务器之间并行进行的。
5.可靠的消息处理。Storm保证每个消息至少能得到一次完整处理。任务失败时,它
会负责从消息源重试消息。
6.快速。系统的设计保证了消息能得到快速的处理,使用?MQ作为其底层消息队列。
7.本地模式。Storm有一个“本地模式”,可以在处理过程中完全模拟Storm集群。
这让你可以快速进行开发和单元测试。
Storm集群由一个主节点和多个工作节点组成。主节点运行了一个名为“Nimbus”的守护进程,用于分配代码、布置任务及故障检测。每个工作节点都运行了一个名为“Supervisor”的守护进程,用于监听工作,开始并终止工作进程。Nimbus和Supervisor 都能快速失败,而且是无状态的,这样一来它们就变得十分健壮,两者的协调工作是由Apache ZooKeeper来完成的。
Storm的术语包括Stream、Spout、Bolt、Task、Worker、Stream Grouping和Topology。Stream是被处理的数据。Sprout是数据源。Bolt处理数据。Task是运行于Spout或Bolt中的线程。Worker是运行这些线程的进程。Stream Grouping规定了Bolt 接收什么东西作为输入数据。数据可以随机分配(术语为Shuffle),或者根据字段值分配(术语为Fields),或者广播(术语为All),或者总是发给一个Task(术语为Global),也可以不关心该数据(术语为None),或者由自定义逻辑来决定(术语为Direct)。Topology 是由Stream Grouping连接起来的Spout和Bolt节点网络。在Storm Concepts页面里对这些术语有更详细的描述。
可以和Storm相提并论的系统有Esper、Streambase、HStreaming和Yahoo S4。其中和Storm最接近的就是S4。两者最大的区别在于Storm会保证消息得到处理。这些系
统中有的拥有内建数据存储层,这是Storm所没有的,如果需要持久化,可以使用一个类似于Cassandra或Riak这样的外部数据库。
入门的最佳途径是阅读GitHub上的官方《Storm Tutorial》。其中讨论了多种Storm 概念和抽象,提供了范例代码以便你可以运行一个Storm Topology。开发过程中,可以用本地模式来运行Storm,这样就能在本地开发,在进程中测试Topology。一切就绪后,以远程模式运行Storm,提交用于在集群中运行的Topology。Maven用户可以使用https://www.doczj.com/doc/297186748.html,提供的Storm依赖,地址是https://www.doczj.com/doc/297186748.html,/repo。
要运行Storm集群,你需要Apache Zookeeper、?MQ、JZMQ、Java 6和Python 2.6.6。ZooKeeper用于管理集群中的不同组件,?MQ是内部消息系统,JZMQ是?MQ的Java Binding。有个名为storm-deploy的子项目,可以在AWS上一键部署Storm集群。
Storm指南
在这个教程里面我们将学习如何创建Topologies, 并且把topologies部署到storm的集群里面去。Java将是我们主要的示范语言,个别例子会使用python以演示storm的多语言特性。
准备工作
这个教程使用storm-starter项目里面的例子。我推荐你们下载这个项目的代码并且跟着教程一起做。先读一下:配置storm开发环境和新建一个strom项目这两篇文章把你的机器设置好。
一个Storm集群的基本组件
storm的集群表面上看和hadoop的集群非常像。但是在Hadoop上面你运行的是MapReduce的Job, 而在Storm上面你运行的是Topology。它们是非常不一样的—一个关键的区别是:一个MapReduce Job最终会结束,而一个Topology运永远运行(除非你显式的杀掉他)。
在Storm的集群里面有两种节点:控制节点(master node)和工作节点(worker node)。控制节点上面运行一个后台程序:Nimbus,它的作用类似Hadoop里面的JobTracker。Nimbus负责在集群里面分布代码,分配工作给机器,并且监控状态。
每一个工作节点上面运行一个叫做Supervisor的节点。Supervisor会监听分配给它那台机器的工作,根据需要启动/关闭工作进程。每一个工作进程执行一个Topology的一个子集;一个运行的Topology由运行在很多机器上的很多工作进程组成。
storm topology结构
Nimbus和Supervisor之间的所有协调工作都是通过一个Zookeeper集群来完成。并且,nimbus进程和supervisor 经常都是快速失败(fail-fast)和无状态的。所有的状态要么在Zookeeper里面,要么在本地磁盘上。这也就意味着你可以用kill -9来杀死nimbus和
supervisor进程,然后再重启它们,它们可以继续工作,就好像什么都没有发生过似的。这个设计使得storm不可思议的稳定。
Topologies
为了在storm上面做实时计算,你要去建立一些topologies。一个topology就是一个计算节点所组成的图。Topology里面的每个处理节点都包含处理逻辑,而节点之间的连接则表示数据流动的方向。
运行一个Topology是很简单的。首先,把你所有的代码已经所依赖的jar打进一个jar包。然后运行类似下面的这个命令。
strom jar all-your-code.jar backtype.storm.MyTopology arg1 arg2
这个命令会运行主类: backtype.strom.MyTopology, 参数是arg1, arg2。这个类的main 函数定义这个topology并且把它提交给Nimbus。storm jar负责连接到nimbus并且上传jar文件。
因为topology的定义其实就是一个Thrift结构并且nimbus就是一个Thrift服务,有可以用任何语言创建并且提交topology。上面的方面是用JVM
-based语言提交的最简单的方法, 看一下文章: Running topologies on a production cluster去看看怎么启动和停止topologies。
Stream
Stream是storm里面的关键抽象。一个stream是一个没有边界的tuple序列。storm提供一些语言来以分布式地,可靠地把一个stream传输进一个新的stream。比如:你可以把一个tweets流传输到热门话题的流。
storm提供的最基本的处理stream的原语是spout和bolt。Spout和bolt有接口可以实现以让你实现你的应用的逻辑。
spout的流的源头。比如一个spout可能从Kestrel队列里面读取消息并且把这些消息发射成一个流。又比如一个spout可以调用twitter的一个api并且把返回的tweets发射成一个流。
bolt可以接收任意多个输入stream,作一些处理,有些bolt可能还会发射一些新的stream。一些复杂的流转换,比如从一些tweet里面计算出热门话题,需要多个步骤,从而也就需要多个bolt。Bolt可以做任何事情: 运行函数,过滤tuple, 做一些聚合,做一些合并以及访问数据库等等。
spout和bolt所组成一个网络会被打包成topology,topology是storm里面最高一级的抽象,你可以把topology提交给storm的集群来运行。topology的结构在Topology 那一段已经说过了,这里就不再赘述了。
topology结构
topology里面的每一个节点都是并行运行的。在你的topology里面,你可以指定每个节点的并行度,storm则会在集群里面分配那么多线程来同时计算。
一个topology会一直运行直到你杀死它。storm自动重新分配一些运行失败的任务,并且storm保证你不会有数据丢失,即使在一些机器意外停机并且消息被丢掉的情况下。数据模型(Data Model)
storm使用tuple来作为它的数据模型。每个tuple是一堆值,每个值有一个名字,并且每个值可以是任何类型,在我的理解里面一个tuple可以看作一个没有方法的java对象。总体来看,storm支持所有的基本类型,字符串以及字节数组作为tuple的值类型。你也可以使用你自己定义的类型来作为值类型,只要你实现对应的序列化器(serializer)。topology里面的每个节点必须定义它要发射的tuple的每个字段。比如下面这个bolt定义它锁发射的tuple包含两个字段,类型分别是: doble和triple。
publicclass DoubleAndTripleBolt implements IRichBolt {
private OutputCollectorBase _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollecto rBase collector){
_collector = collector;
}
@Override
public void execute(Tuple input){
int val = input.getInteger(0);
_collector.emit(input, new Values(val*2, val*3));
_collector.ack(input);
}
@Override
public void cleanup(){
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("double", "triple"));
}
}
declareOutputFields方法定义要输出的字段:["double", "triple"]。这个bolt的其它部分我们接下来会解释。
一个简单的Topology
让我们来看一个简单的topology的例子,我们看一下storm-starter里面的ExclamationTopology:
TopologyBuilder builder =new TopologyBuilder();
builder.setSpout(1, new TestWordSpout(), 10);
builder.setBolt(2, new ExclamationBolt(), 3)
.shuffleGrouping(1);
builder.setBolt(3, new ExclamationBolt(), 2)
.shuffleGrouping(2);
这个Topology包含一个Spout和两个Bolt。Spout发射单词,每个bolt在每个单词后面加个”!!!”。这三个节点被排成一条线: spout发射单词给第一个bolt,第一个bolt然后把处理好的单词发射给第二个bolt。如果spout发射的单词是["bob"]和["john"], 那么第二个bolt会发射["bolt!!!!!!"]和["john!!!!!!"]出来。
我们使用setSpout和setBolt来定义Topology里面的节点。这些方法接收我们指定的一个id,一个包含处理逻辑的对象(spout或者bolt), 以及你所需要的并行度。
这个包含处理的对象如果是spout那么要实现IRichSpout的接口,如果是bolt,那么就要实现IRichBolt接口.
最后一个指定并行度的参数是可选的。它表示集群里面需要多少个thread来一起执行这个节点。如果你忽略它那么storm会分配一个线程来执行这个节点。
setBolt方法返回一个InputDeclarer对象,这个对象是用来定义Bol的输入。这里第一个Bolt声明它要读取spout所发射的所有的tuple —使用shuffle grouping。而第二个bolt声明它读取第一个bolt所发射的tuple。shuffle grouping表示所有的tuple会被随机的分发给bolt的所有task。给task分发tuple的策略有很多种,后面会介绍。
如果你想第二个bolt读取spout和第一个bolt所发射的所有的tuple,那么你应该这样定义第二个bolt:
builder.setBolt(3, new ExclamationBolt(), 5)
.shuffleGrouping(1)
.shuffleGrouping(2);
让我们深入地看一下这个topology厘米的spout和bolt是怎么实现的。Spout负责发射新的tuple到这个topology里面来。TestWordSpout从["nathan", "mike", "jackson", "golda", "bertels"]里面随机选择一个单词发射出来。TestWordSpout里面的nextTuple()方法是这样定义的:
public void nextTuple(){
Utils.sleep(100);
final String[] words =new String[]{"nathan", "mike", "jackson", "golda ", "bertels"};
final Random rand =new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}
可以看到,实现很简单。
ExclamationBolt把”!!!”拼接到输入tuple后面。我们来看下ExclamationBolt的完整实现。
publicstaticclass ExclamationBolt implements IRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollecto r collector){
_collector = collector;
}
public void execute(Tuple tuple){
_collector.emit(tuple, new Values(tuple.getString(0)+"!!! "));
_collector.ack(tuple);
}
public void cleanup(){
}
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("word"));
}
}
prepare方法提供给bolt一个Outputcollector用来发射tuple。Bolt可以在任何时候发射tuple —在prepare, execute或者cleanup方法里面, 或者甚至在另一个线程里面异步发射。这里prepare方法只是简单地把OutputCollector作为一个类字段保存下来给后面execute方法使用。
execute方法从bolt的一个输入接收tuple(一个bolt可能有多个输入源). ExclamationBolt获取tuple的第一个字段,加上”!!!”之后再发射出去。如果一个bolt 有多个输入源,你可以通过调用Tuple#getSourceComponent方法来知道它是来自哪个输入源的。
execute方法里面还有其它一些事情值得一提:输入tuple被作为emit方法的第一个参数,并且输入tuple在最后一行被ack。这些呢都是Storm可靠性API的一部分,后面会解释。
cleanup方法在bolt被关闭的时候调用,它应该清理所有被打开的资源。但是集群不保证这个方法一定会被执行。比如执行task的机器down掉了,那么根本就没有办法来调用那个方法。cleanup设计的时候是被用来在local mode的时候才被调用(也就是说在一个进程里面模拟整个storm集群), 并且你想在关闭一些topology的时候避免资源泄漏。
最后,declareOutputFields定义一个叫做”word”的字段的tuple。
以local mode运行ExclamationTopology
让我们看看怎么以local mode运行ExclamationToplogy。
storm的运行有两种模式: 本地模式和分布式模式. 在本地模式中,storm用一个进程里面的线程来模拟所有的spout和bolt. 本地模式对开发和测试来说比较有用。你运行storm-starter里面的topology的时候它们就是以本地模式运行的,你可以看到topology 里面的每一个组件在发射什么消息。
在分布式模式下,storm由一堆机器组成。当你提交topology给master的时候,你同时也把topology的代码提交了。master负责分发你的代码并且负责给你的topolgoy分配工作进程。如果一个工作进程挂掉了,master节点会把认为重新分配到其它节点。关于如何在一个集群上面运行topology,你可以看看Running topologies on a production cluster文章。
下面是以本地模式运行ExclamationTopology的代码:
Config conf =new Config();
conf.setDebug(true);conf.setNumWorkers(2);
LocalCluster cluster =new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
首先,这个代码定义通过定义一个LocalCluster对象来定义一个进程内的集群。提交topology给这个虚拟的集群和提交topology给分布式集群是一样的。通过调用submitTopology方法来提交topology,它接受三个参数:要运行的topology的名字,一个配置对象以及要运行的topology本身。
topology的名字是用来唯一区别一个topology的,这样你然后可以用这个名字来杀死这个topology的。前面已经说过了,你必须显式的杀掉一个topology,否则它会一直运行。
Conf对象可以配置很多东西,下面两个是最常见的:
1. TOPOLOGY_WORKERS(setNumWorkers) 定义你希望集群分配多少个工作进程给你来执行这个topology. topology里面的每个组件会被需要线程来执行。每个组件到底用多少个线程是通过setBolt和setSpout来指定的。这些线程都运行在工作进程里面. 每一个工作进程包含一些节点的一些工作线程。比如,如果你指定300个线程,60个进程,那么每个工作进程里面要执行6个线程,而这6个线程可能属于不同的组件(Spout, Bolt)。你可以通过调整每个组件的并行度以及这些线程所在的进程数量来调整topology的性能。
2.TOPOLOGY_DEBUG(setDebug), 当它被设置成true的话,storm会记录下每个组件所发射的每条消息。这在本地环境调试topology很有用,但是在线上这么做的话会影响性能的。
感兴趣的话可以去看看Conf对象的Javadoc去看看topology的所有配置。
可以看看创建一个新storm项目去看看怎么配置开发环境以使你能够以本地模式运行topology.
Stream grouping
Stream grouping告诉topology如何在两个组件之间发送tuple。要记住,spouts和bolts以很多task的形式在topology里面同步执行。如何从task粒度来看一个运行的topology,它应该是这样的:
从task角度来看topology
当Bolt A的一个task要发送一个tuple给Bolt B,它应该发送给Bolt B的哪个task呢?stream grouping专门回答这种问题的。在我们深入研究不同的stream grouping之前,让我们看一下storm-starter里面的另外一个topology。WordCountTopology读取一些句子,输出句子里面每个单词出现的次数.
TopologyBuilder builder =new TopologyBuilder();
builder.setSpout(1, new RandomSentenceSpout(), 5);
builder.setBolt(2, new SplitSentence(), 8)
.shuffleGrouping(1);
builder.setBolt(3, new WordCount(), 12)
.fieldsGrouping(2, new Fields("word"));
SplitSentence对于句子里面的每个单词发射一个新的tuple, WordCount在内存里面维护一个单词->次数的mapping,WordCount每收到一个单词,它就更新内存里面的统计状态。
有好几种不同的stream grouping:
最简单的grouping是shuffle grouping, 它随机发给任何一个task。上面例子里面RandomSentenceSpout和SplitSentence之间用的就是shuffle grouping, shuffle grouping对各个task的tuple分配的比较均匀。
一种更有趣的grouping是fields grouping, SplitSentence和WordCount之间使用的就是fields grouping, 这种grouping机制保证相同field值的tuple会去同一个task,这对于WordCount来说非常关键,如果同一个单词不去同一个task,那么统计出来的单词次数就不对了。
fields grouping是stream合并,stream聚合以及很多其它场景的基础。在背后呢,fields grouping使用的一致性哈希来分配tuple的。
还有一些其它类型的stream grouping. 你可以在Concepts一章里更详细的了解。
使用别的语言来定义Bolt
Bolt可以使用任何语言来定义。用其它语言定义的bolt会被当作subprocess来执行,storm使用JSON消息通过stdin/stdout来和这些subprocess通信。这个通信协议是一个只有100行的库,storm团队给这些库开发了对应的Ruby, Python和Fancy版本。下面是WordCountTopology里面的SplitSentence的定义:
publicstaticclass SplitSentence extends ShellBolt implements IRichBo lt {
public SplitSentence(){
super("python", "splitsentence.py");
}
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("word"));
}
}
SplitSentence继承自ShellBolt并且声明这个Bolt用python来运行,并且参数是: splitsentence.py。下面是splitsentence.py的定义:
import storm
class SplitSentenceBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
SplitSentenceBolt().run()
更多有关用其它语言定义Spout和Bolt的信息,以及用其它语言来创建topology的信息可以参见: Using non-JVm languages with Storm.
可靠的消息处理
在这个教程的前面,我们tuple发射的一些方面。这些方面就是storm的可靠性API:storm 如果保证spout发出的每一个tuple都被完整处理。看看《storm如何保证消息不丢失》以更深入了解storm的可靠性API.
结论
这个入门教程比较广泛的介绍了从开发,测试和部署一个topology. 文档的其它部分会深入介绍使用storm的各个方面。
Storm如何保证消息不丢失
storm保证从spout发出的每个tuple都会被完全处理。这篇文章介绍storm是怎么做到这个保证的,以及我们使用者怎么做才能充分利用storm的可靠性特点。
一个tuple被”完全处理”是什么意思?
就如同蝴蝶效应一样,从spout发射的一个tuple可以引起其它成千上万个tuple因它而产生,想想那个计算一篇文章中每个单词出现次数的topology.
TopologyBuilder builder =new TopologyBuilder();
builder.setSpout(1, new KestrelSpout("https://www.doczj.com/doc/297186748.html,", 22133,
"sentence_queue",
new StringScheme()));
builder.setBolt(2, new SplitSentence(), 10)
.shuffleGrouping(1);
builder.setBolt(3, new WordCount(), 20)
.fieldsGrouping(2, new Fields("word"));
这个topology从一个Kestrel队列读取句子,把每个句子分割成一个个单词,然后发射这一个个单词:一个源tuple(一个句子)引起后面很多tuple的产生(一个个单词),这个消息流大概是这样的:
统计单词出现次数的tuple树
在storm里面一个tuple被完全处理的意思是:这个tuple以及由这个tuple所导致的所有的tuple都被成功处理。而一个tuple会被认为处理失败了如果这个消息在timeout所指定的时间内没有成功处理。而这个timetout可以通过
Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS来指定。
如果一个消息处理成功了或者失败了会发生什么?
FYI。下面这个是spout要实现的接口:
publicinterface ISpout extends Serializable{
void open(Map conf, TopologyContext context, SpoutOutputCollector co llector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
首先storm通过调用spout的nextTuple方法来获取下一个tuple, Spout通过open方法参数里面提供的SpoutOutputCollector来发射新tuple到它的其中一个输出消息流, 发射tuple的时候spout会提供一个message-id, 后面我们通过这个tuple-id来追踪这个tuple。举例来说,KestrelSpout从kestrel队列里面读取一个消息,并且把kestrel提供的消息id作为message-id, 看例子:
_collector.emit(new Values("field1", "field2", 3) , msgId);
接下来,这个发射的tuple被传送到消息处理者bolt那里,storm会跟踪由此所产生的这课tuple树。如果storm检测到一个tuple被完全处理了,那么storm会以最开始的那个message-id作为参数去调用消息源的ack方法;反之storm会调用spout的fail方法。值得注意的一点是,storm调用ack或者fail的task始终是产生这个tuple的那个task。
所以如果一个spout被分成很多个task来执行,消息执行的成功失败与否始终会通知最开始发出tuple的那个task。
我们再以KestrelSpout为例来看看spout需要做些什么才能保证“一个消息始终被完全处理”, 当KestrelSpout从Kestrel里面读出一条消息,首先它“打开”这条消息,这意味着这条消息还在kestrel队列里面,不过这条消息会被标示成“处理中”直到ack或者fail被调用。处于“处理中“状态的消息不会被发给其他消息处理者了;并且如果这个spout “断线”了,那么所有处于“处理中”状态的消息会被重新标示成“等待处理”. Storm的可靠性API
作为storm的使用者,有两件事情要做以更好的利用storm的可靠性特征。首先,在你生成一个新的tuple的时候要通知storm; 其次,完成处理一个tuple之后要通知storm。这样storm就可以检测整个tuple树有没有完成处理,并且通知源spout处理结果。storm 提供了一些简洁的api来做这些事情。
由一个tuple产生一个新的tuple称为:anchoring。你发射一个新tuple的同时也就完成了一次anchoring。看下面这个例子:这个bolt把一个包含一个句子的tuple分割成每个单词一个tuple。
publicclass SplitSentence implements IRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollecto r collector){
_collector = collector;
}
public void execute(Tuple tuple){
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")){
_collector.emit(tuple, new Values(word));
}
_collector.ack(tuple);
}
public void cleanup(){
}
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("word"));
}
}
看一下这个execute方法,emit的第一个参数是输入tuple,第二个参数则是输出tuple,这其实就是通过输入tuple anchoring了一个新的输出tuple。因为这个“单词tuple”被anchoring在“句子tuple”一起,如果其中一个单词处理出错,那么这整个句子会被重新处理。作为对比,我们看看如果通过下面这行代码来发射一个新的tuple的话会有什么结果。
_collector.emit(new Values(word));
用这种方法发射会导致新发射的这个tuple脱离原来的tuple树(unanchoring), 如果这个tuple处理失败了,整个句子不会被重新处理。到底要anchoring还是要unanchoring 则完全取决于你的业务需求。
一个输出tuple可以被anchoring到多个输入tuple。这种方式在stream合并或者stream 聚合的时候很有用。一个多入口tuple处理失败的话,那么它对应的所有输入tuple都要重新执行。看看下面演示怎么指定多个输入tuple:
List anchors =new ArrayList
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));
多入口tuple把这个新tuple加到了多个tuple树里面去了。
我们通过anchoring来构造这个tuple树,最后一件要做的事情是在你处理完当个tuple 的时候告诉storm, 通过OutputCollector类的ack和fail方法来做,如果你回过头来看看SplitSentence的例子,你可以看到“句子tuple”在所有“单词tuple”被发出之后调用了ack。
你可以调用OutputCollector的fail方法去立即将从消息源头发出的那个tuple标记为fail,比如你查询了数据库,发现一个错误,你可以马上fail那个输入tuple,这样可以让这个tuple被快速的重新处理,因为你不需要等那个timeout时间来让它自动fail。每个你处理的tuple,必须被ack或者fail。因为storm追踪每个tuple要占用内存。所以如果你不ack/fail每一个tuple,那么最终你会看到OutOfMemory错误。
大多数Bolt遵循这样的规律:读取一个tuple;发射一些新的tuple;在execute的结束的时候ack这个tuple。这些Bolt往往是一些过滤器或者简单函数。Storm为这类规律封装了一个BasicBolt类。如果用BasicBolt来做,上面那个SplitSentence可以改写成这样:
publicclass SplitSentence implements IBasicBolt {
public void prepare(Map conf, TopologyContext context){
}
public void execute(Tuple tuple, BasicOutputCollector collector){ String sentence = tuple.getString(0);
for(String word: sentence.split(" ")){
collector.emit(new Values(word));
}
}
public void cleanup(){
}
public void declareOutputFields(OutputFieldsDeclarer declarer){
基于Storm的实时大数据处理 摘要:随着互联网的发展,需求也在不断地改变,基于互联网的营销业务生命周期越来越短,业务发展变化越来越快,许多业务数据量以指数级增长等等都要求对大量的数据做实时处理,并要求保证数据准确可靠。面对这些挑战云计算、大数据概念应运而生,Hadoop、Storm等技术如雨后春笋般出现。本文就当今最火的实时流数据处理系统Storm进行详细介绍。在介绍Storm之前首先详细介绍了实时计算和分布式系统相关技术概念以便为后面内容做铺垫。通过对Storm的基本概念、核心理念、运行机制和编程场景进行了全面的探讨,使得我们对Storm有了一个比较全面的理解和方便我们在这方面进行更进一步的学习。 关键字:Storm;实时大数据;流数据处理 1概要 当今世界,信息爆炸的时代,互联网上的数据正以指数级别的速度增长。新浪微博注册用户已经超过3亿,用户日平均在线时长60min,平均每天发布超过1亿条微博[1]。在这种背景下,云计算的概念被正式提出,立即引起了学术界和产业界的广泛关注和参与。Google 是云计算最早的倡导者,随后各类大型软件公司都争先在“云计算”领域进行一系列的研究和部署工作。目前最流行的莫过于Apache的开源项目Hadoop分布式计算平台,Hadoop专注于大规模数据存储和处理。这种模型对以往的许多情形虽已足够,如系统日志分析、网页索引建立(它们往往都是把过去一段时间的数据进行集中处理),但是在实时大数据方面,Hadoop的MapReduce却显得力不从心,业务场景中需要低延迟的响应,希望在秒级别或者毫秒级别完成分析,得到响应,并希望能够随着数据量的增大而扩展。此时,Twitter公司推出开源分布式、容错的实时流计算系统Storm,它的出现使得大规模数据实时处理成为可能,填补了该领域的空白。 Storm是一个类似于Hadoop可以处理大量数据流的分布式实时计算系统。但是二者存在很大的区,其最主要的区别在于Storm的数据一直在内存中流转,Hadoop使用磁盘作为交换介质,需要读写磁盘。在应用领域方面,Storm是基于流的实时处理,Hadoop是基于任务调度的批量处理。另一个方面,Hadoop基于HDFS需要切分输入数据、产生中间数据文件、排序、数据压缩、多份复制等,效率比较低,而Storm基于ZeroMQ这个高性能消息通讯库,不持久化数据[2]。 2实时计算介绍 实时计算(Real-time computing)也称为即时计算,是计算机科学中对受到“实时约束”的计算机硬件和计算机软件系统的研究,实时约束是从事件发生到系统回应之间的最长时间限制。实时程序必须保证在严格的时间限制内响应。 互联网领域的实时计算一般都是针对海量数据进行的,实时计算最重要的一个需求是能够实时响应计算结果,一般要求为秒级。互联网行业的实时计算可以分为以下两种应用场景:(1)持续计算:主要用于互联网流式数据处理。所谓流式数据是指将数据看作是数据流的形式来处理。数据流是一系列数据记录的集合体。常见的数据流如网站的访问PV/UV、点击、搜索关键字。 (2)实时分析:主要用于特定场合下的数据分析处理。当数据量很大,且存在无穷的查询条件组合,或穷举并提前计算和保存结果的代价很大时,实时计算就可以发挥作用,将部分计算或全部计算过程推迟到查询阶段进行,但要求能够实时响应。 实时计算需要解决的问题和难点是实时存储和实时计算。实时存储可以通过使用高性能