Storm常见模式——流聚合
- 格式:doc
- 大小:34.50 KB
- 文档页数:4
Storm 处理流数据的工作机制Storm 是一个开源的分布式实时计算系统,被广泛应用于处理实时流数据。
它具有高容错性、高吞吐量和低延迟的特点,适用于各种实时数据处理场景。
本文将详细介绍 Storm 处理流数据的工作机制。
1. Storm 概述Storm 是由 Apache 开源的,用于处理实时流数据的分布式计算系统,它能够在集群中处理连续不断的数据流,并实时地进行分析和计算。
它具有可伸缩性、容错性和高性能的特点,广泛应用于大规模数据处理领域。
2. Storm 架构Storm 运行在分布式集群中,通常由主节点(Master Node)和工作节点(Worker Node)组成。
Storm 采用主从架构,主节点负责任务分配和协调,而工作节点负责实际的数据处理。
2.1 Spout 组件在 Storm 中,Spout 组件用于从数据源中读取流数据,并将其发送给后续的数据处理组件。
Spout 组件可以读取各种类型的数据源,例如消息队列、文件系统或网络流。
2.2 Bolt 组件Bolt 组件是 Storm 中的数据处理单元,负责对传入的数据流进行处理和转换。
Bolt 组件可以执行各种计算、过滤、聚合和输出等操作,可以单独使用或者通过拓扑结构串连多个 Bolt 组件形成任务流水线。
2.3 Topology 拓扑Storm 中的任务被称为拓扑(Topology),它由一组 Spout 组件和一组 Bolt 组件构成。
通过定义这些组件之间的连接关系,可以形成一个完整的数据处理流程。
3. Storm 的工作流程Storm 的工作流程可以概括为以下几个步骤:1.主节点将任务拓扑提交给 Storm 集群。
2.Storm 集群将任务拓扑分发给工作节点。
3.每个工作节点负责执行一部分任务,并创建对应的任务线程。
4.Spout 组件从数据源中读取流数据,并将其发送给后续的 Bolt 组件。
5.Bolt 组件对接收到的数据流进行处理和转换,并将结果发送给下一个 Bolt组件或最终输出。
storm的用法一、了解Storm大数据处理框架Storm是一个用于实时流数据处理的分布式计算框架。
它由Twitter公司开发,并于2011年发布。
作为一个开源项目,Storm主要用于处理实时数据,比如实时分析、实时计算、流式ETL等任务。
二、Storm的基本概念及特点1. 拓扑(Topology):拓扑是Storm中最重要的概念之一。
它代表了整个计算任务的结构和流程。
拓扑由一系列组件组成,包括数据源(Spout)、数据处理节点(Bolt)以及它们之间的连接关系。
2. 数据源(Spout):Spout负责从外部数据源获取数据,并将其发送给Bolt进行处理。
在拓扑中,通常会有一个或多个Spout进行数据输入。
3. 数据处理节点(Bolt):Bolt是对数据进行实际处理的模块。
在Bolt中可以进行各种自定义的操作,如过滤、转换、聚合等,根据业务需求不同而定。
4. 流组(Stream Grouping):Stream Grouping决定了从一个Bolt到下一个Bolt 之间的任务调度方式。
Storm提供了多种Stream Grouping策略,包括随机分组、字段分组、全局分组等。
5. 可靠性与容错性:Storm具有高可靠性和容错性的特点。
它通过对任务状态进行追踪、失败重试机制和数据备份等方式,确保了整个计算过程的稳定性。
6. 水平扩展:Storm可以很方便地进行水平扩展。
通过增加计算节点和调整拓扑结构,可以实现对处理能力的无缝提升。
三、Storm的应用场景1. 实时分析与计算:Storm适用于需要对大规模实时数据进行即时分析和计算的场景。
比如金融领域中的实时交易监控、电商平台中用户行为分析等。
2. 流式ETL:Storm可以实现流式ETL(Extract-Transform-Load)操作,将源数据进行抽取、转换和加载到目标系统中,并实时更新数据。
3. 实时推荐系统:通过结合Storm和机器学习算法,可以构建快速响应的实时推荐系统。
storm技术中的流(Stream)_光环大数据storm课程培训流(Stream)是Storm的核心抽象,是一个无界的元组序列。
源源不断传递的元组就组成了流,在分布式环境中并行地进行创建和处理。
流被定义成一个为流中元组字段进行命名的模式,默认情况下,元组可以包含整型(integer)、长整型(long)、短整型(short)、字节(byte)、字符(string)、双精度数(double)、浮点数(float)、布尔值(boolean)和字节数组(bytearray),还可以自定义序列化器,以便本地元组可以使用自定义类型。
流由元组组成,使用OutputFieldsDeclarer声明流及其模式。
Serialization是Storm的动态元组类型的信息,声明自定义序列化。
自定义序列化必须实现ISerialization接口,自定义序列化可以注册使用CONFIG.TOPOLOGY_SERIALIZATIONS这个配置。
Storm提供可靠的方式把原语转换成一个新的分布式的流,执行流转换的基本元素是Spout和Bolt。
Spout是流的源头,通常从外部数据源读取元组,并emit到拓扑中。
例如,Spout从Kestrel队列中读取元组,并作为一个流提交到拓扑。
Bolt接收任何数量的输入流,执行处理后,可能提交新的流。
复杂流的转换,如从tweets流中计算一个热门话题,需要多个步骤,因此需要多个Bolt。
Bolt可以执行运行函数、过滤元组、连接流和连接数据库等任何事情。
如图所示,Spout和Bolt的网络被打包成一个“拓扑”,即顶级抽象,之后提交这个拓扑到Storm集群中执行。
拓扑是一个图的流转换,节点表示Spout或Bolt,弧边指示哪些Bolt订阅了该流。
当一个Spout或Bolt发射一个元组到一个流时,它会发射元组到每一个订阅该流的Bolt。
StormTopology拓扑节点之间的连接表示元组应该如何传递。
Storm的5个主要术语Storm是一个开源分布式实时计算系统,它被广泛应用于大规模数据处理和实时分析。
在Storm中,有一些主要的术语被用来描述其核心概念和工作原理。
本文将详细介绍Storm的5个主要术语,包括拓扑(Topology)、流(Stream)、Spout、Bolt和任务(Task)。
1. 拓扑(Topology)拓扑是Storm中最基本的概念之一。
它表示了一个实时计算任务的结构和流程。
拓扑由多个组件(Component)组成,每个组件负责特定的数据处理任务。
组件之间通过流进行连接,形成了一个有向无环图。
拓扑可以看作是一个数据处理的蓝图,它定义了数据从输入到输出的整个计算过程。
在拓扑中,每个组件都可以并行执行,并且可以在不同节点上进行分布式部署。
通过合理设计拓扑结构,可以实现高效的数据处理和并行计算。
2. 流(Stream)流是Storm中用来传递数据的基本单位。
它代表了一系列具有相同类型的数据项,在拓扑中从一个组件流向另一个组件。
流可以包含多个字段,每个字段都有特定的类型和含义。
在拓扑中,流可以被分为多个分支,每个分支可以由不同的组件处理。
这种方式使得数据可以以不同的路径进行处理,从而实现更灵活和高效的计算。
同时,流还支持多种操作,如过滤、聚合、转换等,可以对数据进行各种形式的处理和加工。
3. SpoutSpout是Storm中用于数据输入的组件。
它负责从外部数据源读取数据,并将其发送到拓扑中的下一个组件。
Spout可以读取各种不同类型的数据源,如消息队列、文件系统、数据库等。
在拓扑中,Spout通常是数据流的起点。
它以一定的速率产生数据,并通过流发送给下一个组件进行处理。
Spout还可以实现可靠性保证机制,确保数据不会丢失或重复发送。
通过合理配置Spout的并行度和任务数,可以实现高吞吐量和低延迟的数据输入。
4. BoltBolt是Storm中用于数据处理和计算的组件。
它接收来自上游组件(如Spout或其他Bolt)传递过来的数据流,并对其进行加工、过滤、聚合等操作。
storm的用法总结大全- Storm是一个开源的实时大数据处理系统,用于处理实时数据流。
它可以与Hadoop 集成,提供高性能的实时数据处理能力。
- Storm可以用于实时分析和处理大规模数据流,如日志数据、传感器数据等。
它可以处理来自不同数据源的数据流,并将数据流分发到不同的处理单元进行处理。
- Storm使用一种称为拓扑(Topology)的方式来描述数据处理流程。
拓扑是由多个处理单元(称为Bolt)和连接它们的数据流(称为Spout)组成的。
- Spout可以从数据源中读取数据,并将数据流发射给Bolt进行处理。
Bolt可以对数据进行转换、过滤、聚合等操作,并将结果发射给下一个Bolt进行处理。
多个Bolt可以并行地执行不同的处理任务。
- Storm的拓扑可以灵活地配置,可以按照需要添加、删除、修改Bolt和Spout。
它支持高可靠性、高吞吐量的数据流处理,并且可以实现在不同的节点之间进行任务的负载均衡。
- Storm提供了可扩展性和容错性,可以通过水平扩展集群节点来处理更大规模的数据流,并且在节点故障时能够保证处理的连续性。
- Storm提供了丰富的API和工具,可以方便地开发和调试数据处理拓扑。
它支持多种编程语言,如Java、Python等,并提供了强大的拓扑调试和可视化工具,方便监控和管理拓扑的运行状态。
- Storm可以与其他大数据处理框架(如Hadoop、Hive、HBase等)集成,在数据处理过程中实现数据的交换和共享。
它还可以与消息中间件(如Kafka、RabbitMQ等)和实时数据库(如Redis、Cassandra等)集成,实现与其他系统的无缝连接。
- Storm有广泛的应用场景,如实时推荐系统、实时风控系统、实时数据分析、实时监控和报警等。
它在互联网、金融、电信、物联网等领域都有着广泛的应用。
storm基本概念在运行一个Storm任务之前,需要了解一些概念:1.Topologies2.Streams3.Spouts4.Bolts5.Stream groupings6.Reliability7.Tasks8.Workers9.ConfigurationStorm集群和Hadoop集群表面上看很类似。
但是Hadoop上运行的是MapReduce jobs,而在Storm上运行的是拓扑(topology),这两者之间是非常不一样的。
一个关键的区别是:一个MapReduce job最终会结束,而一个topology永远会运行(除非你手动kill掉)。
在Storm的集群里面有两种节点:控制节点(master node)和工作节点(worker node)。
控制节点上面运行一个叫Nimbus后台程序,它的作用类似Hadoop里面的JobTracker。
Nimbus负责在集群里面分发代码,分配计算任务给机器,并且监控状态。
每一个工作节点上面运行一个叫做Supervisor的节点。
Supervisor会监听分配给它那台机器的工作,根据需要启动/关闭工作进程。
每一个工作进程执行一个topology的一个子集;一个运行的topology由运行在很多机器上的很多工作进程组成。
Nimbus和Supervisor之间的所有协调工作都是通过Zookeeper 集群完成。
另外,Nimbus进程和Supervisor进程都是快速失败(fail-fast)和无状态的。
所有的状态要么在zookeeper里面,要么在本地磁盘上。
这也就意味着你可以用kill -9来杀死Nimbus和Supervisor进程,然后再重启它们,就好像什么都没有发生过。
这个设计使得Storm异常的稳定。
2.1.1 Topologies一个topology是spouts和bolts组成的图,通过stream groupings将图中的spouts和bolts连接起来,如下图:一个topology会一直运行直到你手动kill掉,Storm自动重新分配执行失败的任务,并且Storm可以保证你不会有数据丢失(如果开启了高可靠性的话)。
storm原理
Storm是一个开源的分布式实时计算系统,它可以用于实时大数据处理、分布式计算、流式计算等场景。
Storm的核心是一个分布式实时流数据处理引擎,它可以支持高可用、高可靠、高吞吐等特性。
Storm采用了类似于MapReduce的数据处理模型,但是它能够实现更高的计算速度和更低的延迟。
Storm的架构包含了多个组件,包括Zookeeper、Nimbus、Supervisor、Worker等。
其中,Zookeeper用于存储Storm的元数据,Nimbus是Storm的主节点,负责协调各个组件的工作,Supervisor 是Storm的工作节点,负责接收和处理任务,Worker是Supervisor 的子进程,用于具体的数据处理和计算。
Storm的计算模型包含了两种基本的数据类型:流和批。
流式数据是指实时产生的数据流,例如传感器数据、日志数据等。
批式数据则是指离线生成的数据集,例如Hadoop中的数据集。
Storm的计算单元是一个个“Bolt”和“Spout”。
Bolt用于处理数据,例如计算、聚合、过滤等操作,Spout用于产生数据流,例如从消息队列或者网络中获取数据。
Storm的工作流程是这样的:Spout从数据源获取数据,然后将数据发送给Bolt进行处理。
Bolt处理数据后,可以再次发送给下一个Bolt或者输出结果。
Storm可以支持多层嵌套的Bolt和Spout,从而实现复杂的数据处理和计算任务。
总之,Storm是一款强大的分布式实时计算系统,它可以帮助用
户快速实现数据处理和分析,提高数据处理的效率和准确性。
storm实验后的心得Storm实验后的心得在进行了一系列关于Storm的实验后,我对这个分布式实时计算系统有了更加深刻的理解和认识。
通过实验,我发现Storm具有高性能、可扩展性强、容错性好等优点,适用于处理大规模的实时数据流。
下面我将就我的实验心得进行总结和分享。
我觉得Storm的高性能是它最大的优势之一。
在实验中,我通过构建拓扑结构,将数据流分成多个阶段进行处理。
通过合理的拓扑结构设计和任务划分,我成功地将计算任务分发到不同的计算节点上,实现了并行计算。
这样一来,Storm可以同时处理多个数据流,大大提高了计算效率。
Storm的可扩展性也给我留下了深刻的印象。
在实验中,我可以根据实际需求动态地增加或减少计算节点,而无需停止整个系统。
这种灵活的扩展性使得Storm能够应对不断增长的数据规模和计算需求,保证了系统的稳定性和可靠性。
Storm的容错性也是我在实验中感受到的一大优点。
在实验过程中,我模拟了计算节点的故障情况,并观察了系统的容错能力。
我发现,当一个计算节点发生故障时,Storm会自动将该节点上的任务重新分配给其他正常的节点,确保计算任务的连续性和正确性。
这种容错机制使得Storm具有很高的可靠性,在大规模分布式计算中表现出色。
Storm还具有灵活的数据处理能力。
在实验中,我可以根据实际需求设计不同的数据处理逻辑,包括数据过滤、数据转换、数据聚合等。
通过使用Storm提供的丰富的操作接口和函数库,我能够灵活地处理各种数据类型和数据流,满足不同的业务需求。
在实验过程中,我还发现Storm的学习曲线相对较陡。
由于Storm 的架构和设计思想与传统的批处理系统有很大的差异,初学者可能需要一定的时间来适应和理解Storm的工作原理。
不过,一旦掌握了Storm的基本概念和操作方式,就能够很好地利用它进行实时数据处理和分析。
总结起来,通过对Storm的实验,我深刻地认识到了这个分布式实时计算系统的优势和特点。
storm流式计算java代码Storm是一种流式计算框架,用于处理实时数据流。
它能够高效地处理大规模数据,并具备容错和可靠性的特性。
下面我将为您介绍一些关于Storm流式计算的相关内容。
一、Storm的基本概念在Storm中,有几个核心概念需要了解。
首先是Spout,它负责从数据源获取数据并将其发送给下游的Bolt。
Bolt是数据的处理单元,可以执行各种操作,如过滤、转换和聚合等。
Bolt还可以将处理后的数据发送给其他Bolt或外部系统。
另外,Storm还引入了Topology的概念,它是一个由Spout和Bolt组成的有向无环图,用于描述数据流的处理逻辑。
二、Storm的并发模型Storm采用了并发模型来实现高效的数据处理。
它将数据划分为多个流并行地进行处理。
在Storm中,可以通过调整并发度来控制处理的速度和负载均衡。
并发度是指同时运行的Spout和Bolt的实例数,它可以根据实际需求进行配置。
三、Storm的消息传递机制在Storm中,消息是通过可靠的消息传递机制进行传递的。
这意味着消息可以被正确地传递和处理,即使系统中发生故障。
Storm使用元组(Tuple)来表示数据,元组是一个有序的值序列。
通过元组,可以在Spout和Bolt之间传递数据,并进行相应的处理操作。
四、Storm的可靠性保证Storm提供了多种机制来确保数据处理的可靠性。
首先,它使用了消息确认机制,当一个Bolt成功处理一个元组时,会向上游发送确认消息。
如果一个Bolt在规定的时间内没有收到确认消息,它会重新发送元组,以保证数据的可靠性。
此外,Storm还提供了容错机制,当一个Bolt失败时,它会被重新启动并继续处理数据。
五、Storm的应用场景Storm可以应用于各种实时数据处理场景。
例如,它可以用于实时的数据分析和监控,帮助企业及时了解业务情况并做出相应的决策。
此外,Storm还可以用于实时推荐系统,通过实时处理用户行为数据,为用户提供个性化的推荐服务。
Storm常见模式——流聚合流聚合(stream join)是指将具有共同元组(tuple)字段的数据流(两个或者多个)聚合形成一个新的数据流的过程。
从定义上看,流聚合和SQL中表的聚合(table join)很像,但是二者有明显的区别:table join 的输入是有限的,并且join的语义是非常明确的;而流聚合的语义是不明确的并且输入流是无限的。
数据流的聚合类型跟具体的应用有关。
一些应用把两个流发出的所有的tuple都聚合起来——不管多长时间;而另外一些应用则只会聚合一些特定的tuple。
而另外一些应用的聚合逻辑又可能完全不一样。
而这些聚合类型里面最常见的类型是把所有的输入流进行一样的划分,这个在storm里面用fields grouping在相同字段上进行grouping就可以实现。
下面是对storm-starter(代码见:https:///nathanmarz/storm-starter)中有关两个流的聚合的示例代码剖析:先看一下入口类SingleJoinExample。
(1)这里首先创建了两个发射源spout,分别是genderSpout和ageSpout:FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));TopologyBuilder builder = new TopologyBuilder();builder.setSpout("gender", genderSpout);builder.setSpout("age", ageSpout);其中genderSpout包含两个tuple字段:id和gender,ageSpout包含两个tuple字段:id和age (这里流聚合就是通过将相同id的tuple进行聚合,得到一个新的输出流,包含id、gender 和age字段)。
(2)为了不同的数据流中的同一个id的tuple能够落到同一个task中进行处理,这里使用了storm中的fileds grouping在id字段上进行分组划分:builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id")).fieldsGrouping("age", new Fields("id"));从中可以看到,SingleJoinBolt就是真正进行流聚合的地方。
下面我们来看看:(1)SingleJoinBolt构造时接收一个Fileds对象,其中传进的是聚合后将要被输出的字段(这里就是gender和age字段),保存到变量_outFileds中。
(2)接下来看看完成SingleJoinBolt的构造后,SingleJoinBolt在真正开始接收处理tuple之前所做的准备工作(代码见prepare方法):a)首先,将保存OutputCollector对象,创建TimeCacheMap对象,设置超时回调接口,用于tuple处理失败时fail消息;紧接着记录数据源的个数:_collector = collector;int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();_pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());_numSources = context.getThisSources().size();b)遍历TopologyContext中不同数据源,得到所有数据源(这里就是genderSpout和ageSpout)中公共的Filed字段,保存到变量_idFields中(例子中就是id字段),同时将_outFileds中字段所在数据源记录下来,保存到一张HashMap中_fieldLocations,以便聚合后获取对应的字段值。
Set<String> idFields = null;for(GlobalStreamId source: context.getThisSources().keySet()) {Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());Set<String> setFields = new HashSet<String>(fields.toList());if(idFields==null) idFields = setFields;else idFields.retainAll(setFields);for(String outfield: _outFields) {for(String sourcefield: fields) {if(outfield.equals(sourcefield)) {_fieldLocations.put(outfield, source);}}}}_idFields = new Fields(new ArrayList<String>(idFields));if(_fieldLocations.size()!=_outFields.size()) {throw new RuntimeException("Cannot find all outfields among sources");}(3)好了,下面开始两个spout流的聚合过程了(代码见execute方法):首先,从tuple中获取_idFields字段,如果不存在于等待被处理的队列_pending中,则加入一行,其中key是获取到的_idFields字段,value是一个空的HashMap<GlobalStreamId, Tuple>对象,记录GlobalStreamId到Tuple的映射。
List<Object> id = tuple.select(_idFields);GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());if(!_pending.containsKey(id)) {_pending.put(id, new HashMap<GlobalStreamId, Tuple>());}从_pending队列中,获取当前GlobalStreamId streamId对应的HashMap对象parts中:Map<GlobalStreamId, Tuple> parts = _pending.get(id);如果streamId已经包含其中,则抛出异常,接收到同一个spout中的两条一样id的tuple,否则将该streamid加入parts中:if(parts.containsKey(streamId)) throw new RuntimeException("Received same side of single join twice");parts.put(streamId, tuple);如果parts已经包含了聚合数据源的个数_numSources时,从_pending队列中移除这条记录,然后开始构造聚合后的结果字段:依次遍历_outFields中各个字段,从_fieldLocations中取到这些outFiled字段对应的GlobalStreamId,紧接着从parts中取出GlobalStreamId对应的outFiled,放入聚合后的结果中。
if(parts.size()==_numSources) {_pending.remove(id);List<Object> joinResult = new ArrayList<Object>();for(String outField: _outFields) {GlobalStreamId loc = _fieldLocations.get(outField);joinResult.add(parts.get(loc).getValueByField(outField));}最后通过_collector将parts中存放的tuple和聚合后的输出结果发射出去,并ack这些tuple 已经处理成功。
_collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);for(Tuple part: parts.values()) {_collector.ack(part);}}否则,继续等待两个spout流中这个streamid都到齐后再进行聚合处理。
(4)最后,声明一下输出字段(代码见declareOutputFields方法):declarer.declare(_outFields);。