JMS ActiveMQ交流学习 演示文稿
- 格式:pptx
- 大小:567.98 KB
- 文档页数:41
JMS开源框架ActiveMQ入门介绍基本的JMS概念与开源的JMS框架ActiveMQ应用,内容涵盖一下几点:1.基本的JMS概念2.JMS的消息模式3.介绍ActiveMQ4.一个基于ActiveMQ的JMS例子程序一:JMS基本概念1. JMS的目标为企业级的应用提供一种智能的消息系统,JMS定义了一整套的企业级的消息概念与工具,尽可能最小化的Java语言概念去构建最大化企业消息应用。
统一已经存在的企业级消息系统功能。
2. 提供者JMS提供者是指那些完全完成JMS功能与管理功能的JMS消息厂商,理论上JMS 提供者完成。
JMS消息产品必须是100%的纯Java语言实现,可以运行在跨平台的架构与操作系统上,当前一些JMS厂商包括IBM,Oracle, JBoss社区 (JBoss Community), Apache 社区(ApacheCommunity)。
3. JMS应用程序, 一个完整的JMS应用应该实现以下功能:∙JMS 客户端– Java语言开发的接受与发送消息的程序∙非JMS客户端–基于消息系统的本地API实现而不是JMS∙消息–应用程序用来相互交流信息的载体∙被管理对象–预先配置的JMS对象,JMS管理员创建,被客户端运用。
如链接工厂,主题等∙JMS提供者–完成JMS功能与管理功能的消息系统∙七七八八网二:JMS的消息模式1.点对点的消息模式(Point to Point Messaging)下面的JMS对象在点对点消息模式中是必须的:a.队列(Queue) –一个提供者命名的队列对象,客户端将会使用这个命名的队列对象b.队列链接工厂(QueueConnectionFactory) –客户端使用队列链接工厂创建链接队列ConnectionQueue来取得与JMS点对点消息提供者的链接。
c. 链接队列(ConnectionQueue) –一个活动的链接队列存在在客户端与点对点消息提供者之间,客户用它创建一个或者多个JMS队列会话(QueueSession)d. 队列会话(QueueSession) –用来创建队列消息的发送者与接受者(QueueSenderandQueueReceiver)e.消息发送者(QueueSender 或者MessageProducer)–发送消息到已经声明的队列f.消息接受者(QueueReceiver或者MessageConsumer) –接受已经被发送到指定队列的消息2.发布订阅模式(publish – subscribe Mode)a.主题Topic(Destination) –一个提供者命名的主题对象,客户端将会使用这个命名的主题对象b.主题链接工厂(TopciConnectionFactory) –客户端使用主题链接工厂创建链接主题ConnectionTopic来取得与JMS消息Pub/Sub提供者的链接。
jms与ActiveMQ实践与应用jms与ActiveMQ实践与应用2013-09-23 15:19:32| 分类: ActiveMQ|字号订阅前言这是我自己从不知道JMS为何物到学习如何使用第三方工具实现跨服务器的知识总结,在整个过程中可能考虑不全。
另外,如果想尽快使用JMS,建议直接看实例那一节就可以了。
有问题多交流。
词语解释(有些词可能用的不是很正确,在这里我把自己能意识到的词拿出来解释一下):1、跨服务器:专业术语好像叫“跨实例”。
意思是,可以在多个服务器(可以是不同的服务器,如resin与tomcat)之间相互通信。
与之对应的是单服务器版。
2、消息生产者:就是专门制造消息的类。
3、消息消费者:也叫消息接收者,它主要是实现了消息监听的一个接口,当然,也可以难过Spring提供的一个转换器接口指定任意一个类中的任意方法。
序我们都知道,任何一个系统从整体上来看,其实质就是由无数个小的服务或事件(我们可以称之为单元)有机地组合起来的。
对于系统中任何一个比较复杂的功能,都是通过调用各个独立的单元以实现统一的协调运作而实现的。
现在我们的问题是,如果有两个完全独立的服务(比如说两个不同系统间的服务)需要相互交换数据,我们该如何实现?好吧,我承认,我很傻很天真,我想到的第一个方法就是在需要的系统中将代码再写一遍,但我也知道,这绝对不现实!好吧,那我就应该好好学习学习达人们是如何去解决这样的问题。
第一种方法,估计也是用的最多的,就是rpc模式。
这种方法就是在自己的代码中远程调用其它程序中的代码以达到交换数据的目的。
但是这种方法很显然地存在了一个问题:就是一定要等到获取了数据之后才能继续下面的操作。
当然,如果一些逻辑是需要这些数据才能操作,那这就是我们需要的。
第二种方法就是Hessian,我个人觉得Hessian的实现在本质上与rpc模式的一样,只是它采用了配置,简化了代码。
上面这两个方法,基本上能解决所有的远程调用的问题了。
1JMSJMS源于企业应用对于消息中间件的需求,使应用程序可以通过消息进行异步处理而互不影响。
Sun公司和它的合作伙伴设计的JMS API定义了一组公共的应用程序接口和相应语法,使得Java程序能够和其他消息组件进行通信。
1.1JMS的基本构件1.1.1连接工厂连接工厂是客户用来创建连接的对象,例如ActiveMQ提供的ActiveMQConnectionFactory。
1.1.2连接JMS Connection封装了客户与JMS提供者之间的一个虚拟的连接。
1.1.3会话JMS Session是生产和消费消息的一个单线程上下文。
会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。
会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。
1.1.4目的地目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。
JMS1.0.2规范中定义了两种消息传递域:点对点(PTP)消息传递域和发布/订阅消息传递域。
点对点消息传递域的特点如下:每个消息只能有一个消费者。
消息的生产者和消费者之间没有时间上的相关性。
无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息。
发布/订阅消息传递域的特点如下:每个消息可以有多个消费者。
生产者和消费者之间有时间上的相关性。
订阅一个主题的消费者只能消费自它订阅之后发布的消息。
JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。
持久订阅允许消费者消费它在未处于激活状态时发送的消息。
在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。
1.1.5消息生产者消息生产者是由会话创建的一个对象,用于把消息发送到一个目的地。
1.1.6消息消费者消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。
消息的消费可以采用以下两种方法之一:同步消费。
ActiveMQ学习教程背景:ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。
选择ActiveMQ作为JMS的入门学习中间件,是因为其拥有以下优点1.多种语言和协议编写客户端。
语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。
应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP2.完全支持JMS1.1和J2EE 1.4规范(持久化,XA消息,事务)3.对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性4.完全支持JMS1.1和J2EE 1.4规范(持久化,XA消息,事务)5.通过了常见J2EE服务器(如Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上6.支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA7.从设计上保证了高性能的集群,客户端-服务器,点对点8.支持Ajax9.支持与Axis的整合10.可以很容易得调用内嵌JMS provider,进行测试学会了ActiveMQ之后,其它供应商的MQ也可以在短时间内快速上手。
安装:ActiveMQ(本文简称MQ)要求JDK1.5以上,推荐1.6以上版本。
还没安装JDK的朋友,请先安装,在此不赘诉了。
安装完JDK后,从/download.html下载MQ的最新版本,本教程使用版本为5.5。
解压后,可以看到MQ目录下有以下文件和目录activemq-all-5.5.0.jar:所有MQ JAR包的集合,用于用户系统调用bin:其中包含MQ的启动脚本conf:包含MQ的所有配置文件data:日志文件及持久性消息数据example:MQ的示例lib:MQ运行所需的所有Libwebapps:MQ的Web控制台及一些相关的DEMO启动MQ:双击bin目录下的activemq.bat文件即可启动MQ第一个示例:新建一个JAVA工程,引用activemq-all-5.5.0.jar,SLFAPI其及对应版本LOG4J的JAR 包(懒的上网找的到附件里下载)Publisher.javaJava代码1import java.util.Hashtable;2import java.util.Map;34import javax.jms.Connection;5import javax.jms.ConnectionFactory;6import javax.jms.Destination;7import javax.jms.JMSException;8import javax.jms.MapMessage;9import javax.jms.Message;10import javax.jms.MessageProducer;11import javax.jms.Session;1213import org.apache.activemq.ActiveMQConnectionFactory;14import mand.ActiveMQMapMessage;1516public class Publisher {1718protected int MAX_DELTA_PERCENT = 1;19protected Map<String, Double> LAST_PRICES = new Hashtable<String, Double>();20protected static int count = 10;21protected static int total;2223protected static String brokerURL = "tcp://localhost:61616";24protected static transient ConnectionFactory factory;25protected transient Connection connection;26protected transient Session session;27protected transient MessageProducer producer;2829public Publisher() throws JMSException {30factory = new ActiveMQConnectionFactory(brokerURL);31connection = factory.createConnection();32try {33connection.start();34} catch (JMSException jmse) {35connection.close();36throw jmse;37}38session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);39producer = session.createProducer(null);40}4142public void close() throws JMSException {43if (connection != null) {44connection.close();45}46}4748public static void main(String[] args) throws JMSException {49Publisher publisher = new Publisher();50while (total < 1000) {51for (int i = 0; i < count; i++) {52publisher.sendMessage(args);53}54total += count;55System.out.println("Published '" + count + "' of '" + total + "' price56try {57Thread.sleep(1000);58} catch (InterruptedException x) {59}60}61publisher.close();62}6364protected void sendMessage(String[] stocks) throws JMSException { 65int idx = 0;66while (true) {67idx = (int)Math.round(stocks.length * Math.random());68if (idx < stocks.length) {69break;70}71}72String stock = stocks[idx];73Destination destination = session.createTopic("STOCKS." + stock); 74Message message = createStockMessage(stock, session);75System.out.println("Sending: " +((ActiveMQMapMessage)message).getContentMap() + " on destination: " +76producer.send(destination, message);77}7879protected Message createStockMessage(String stock, Session session) throws JMSException {80Double value = LAST_PRICES.get(stock);81if (value == null) {82value = new Double(Math.random() * 100);83}8485// lets mutate the value by some percentage86double oldPrice = value.doubleValue();87value = new Double(mutatePrice(oldPrice));88LAST_PRICES.put(stock, value);89double price = value.doubleValue();9091double offer = price * 1.001;9293boolean up = (price > oldPrice);9495MapMessage message = session.createMapMessage();96message.setString("stock", stock);97message.setDouble("price", price);98message.setDouble("offer", offer);99message.setBoolean("up", up);100return message;101}102103protected double mutatePrice(double price) {104double percentChange = (2 * Math.random() * MAX_DELTA_PERCENT) - MAX_DELTA_PERCENT;105106return price * (100 + percentChange) / 100;107}108109}Consumer.javaJava代码110import javax.jms.Connection;111import javax.jms.ConnectionFactory;112import javax.jms.Destination;113import javax.jms.JMSException;114import javax.jms.MessageConsumer;115import javax.jms.Session;116117import org.apache.activemq.ActiveMQConnectionFactory;118119public class Consumer {120121private static String brokerURL = "tcp://localhost:61616"; 122private static transient ConnectionFactory factory;123private transient Connection connection;124private transient Session session;125126public Consumer() throws JMSException {127factory = new ActiveMQConnectionFactory(brokerURL); 128connection = factory.createConnection();129connection.start();130session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);131}132133public void close() throws JMSException {134if (connection != null) {135connection.close();136}137}138139public static void main(String[] args) throws JMSException {140Consumer consumer = new Consumer();141for (String stock : args) {142Destination destination =consumer.getSession().createTopic("STOCKS." + stock);143MessageConsumer messageConsumer =consumer.getSession().createConsumer(destination);144messageConsumer.setMessageListener(new Listener());145}146}147148public Session getSession() {149return session;150}151152}Listener.javaJava代码153import java.text.DecimalFormat;154155import javax.jms.MapMessage;156import javax.jms.Message;157import javax.jms.MessageListener;158159public class Listener implements MessageListener {160161public void onMessage(Message message) {162try {163MapMessage map = (MapMessage)message;164String stock = map.getString("stock");165double price = map.getDouble("price");166double offer = map.getDouble("offer");167boolean up = map.getBoolean("up");168DecimalFormat df = new DecimalFormat( "#,###,###,##0.00" ); 169System.out.println(stock + "\t" + df.format(price) + "\t" +df.format(offer) + "\t" + (up?"up":"down"));170} catch (Exception e) {171 e.printStackTrace();172}173}174}先运行Consumer.java, 输入参数ORCL,然后运行Publisher.java,输入参数ORCL,就可以看到Publisher在发送消息,Consumer在接收消息了。
JMS与MQ详解《一》1.ActiveMQ概述企业消息软件从80年代起就存在,它不只是一种应用间消息传递风格,也是一种集成风格。
因此,消息传递可以满足应用间的通知和互相操作。
但是开源的解决方案是到最近10年才出现的。
Apache ActiveMQ就是其中一种。
它使应用间能以异步,松耦合方式交流。
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。
ActiveMQ是Apache软件基金下的一个开源软件,它遵循JMS 规范(Java Message Service),是消息驱动中间件软件(MOM)。
它为企业消息传递提供高可用,出色性能,可扩展,稳定和安全保障。
ActiveMQ使用Apache许可协议。
因此,任何人都可以使用和修改它而不必反馈任何改变。
这对于商业上将ActiveMQ用在重要用途的人尤为关键。
MOM的工作是在分布式的各应用之间调度事件和消息,使之到达指定的接收者。
所以高可用,高性能,高可扩展性尤为关键。
2.ActiveMQ特性⒈支持多种语言客户端,如:Java,C,C ,C#,Ruby,Perl,Python,PHP。
应用协议有OpenWire,Stomp REST,WS Notification,XMPP,AMQP。
⒉ 完全支持JMS1.1和J2EE1.4规范,它们包括同步和异步消息传递,一次和只有一次的消息传递,对于预订者的持久消息等等。
依附于JMS规范意味着,不论JMS消息提供者是谁,同样的基本特性(持久化,XA消息,事务)都是有效的。
⒊ 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring 的系统里面去。
⒋ 通过了常见J2EE服务器(如Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE1.4 商业服务器上。
⼀、ActiveMQ学习笔记(基础概念)刚准备把使⽤ActiveMQ过程中遇到的问题,以及解决⽅法整理出来的时候,去看了下官⽹,发现ActiveMQ Artemis已经发布好久了。
1、JMS概念JMS即Java消息服务(Java Message Service)应⽤程序接⼝,是⼀个Java平台中关于⾯向消息中间件(MOM)的API,⽤于在两个应⽤程序之间,或分布式系统中发送消息,进⾏异步通信。
JMS是⼀种与⼚商⽆关的 API,绝⼤多数MOM提供商都对JMS提供⽀持,⽤来访问收发系统消息,它类似于JDBC(Java Database Connectivity)。
2、JMS元素提供者(Provider):实现JMS的消息服务中间件服务器。
常⽤的如:Apache ActiveMQ、IBM WebSphere MQ。
客户端:发送或接受消息的应⽤。
⽣产者(Producer):创建并发送消息的JMS客户。
消费者(Consumer):接收并处理消息的JMS客户。
消息(Message):应⽤程序之间传递的数据。
队列(Queue):⼀个容纳那些被发送的等待阅读的消息的区域。
与队列名字所暗⽰的意思不同,消息的接受顺序并不⼀定要与消息的发送顺序相同。
⼀旦⼀个消息被阅读,该消息将被从队列中移⾛。
主题(Topic):⼀种⽀持发送消息给多个订阅者的机制。
3、JMS对象连接⼯⼚(ConnectionFactory):创建Connection对象的⼯⼚。
客户端使⽤JNDI查找连接⼯⼚,然后利⽤连接⼯⼚创建⼀个JMS连接。
连接(Connection):客户端和服务器端之间的⼀个活动的连接,是由客户端通过调⽤连接⼯⼚的⽅法建⽴的。
会话(Session):表⽰客户与服务器之间的会话状态。
会话建⽴在连接上,表⽰客户与服务器之间的⼀个会话线程。
⽬的(Destination):消息⽣产者的消息发送⽬标或者说消息消费者的消息来源。
⽣产者(Message Producer):消息⽣产者由Session创建,并⽤于将消息发送到Destination。
1.队列和主题1.1概念在MQ中,消息模型有两种,一种是队列(Queue,一种是主题(Topic。
队列是Point-To-Point的,队列中的消息,仅能被消费一次。
主题是Pub/Sub模型,主题中的消息,可以由多个订阅者消费;订阅者只能消费它订阅以后的消息。
这是遵循的JMS规范。
1.2收发消息对象创建过程如上图所示,JMS规范中,收发消息的对象创建过程如下,下面的示例代码中也将注释这些过程:1. 初始化ConnetionFactory2. ConnetionFactory创建Connection3. Connection创建Session4. Session创建Destination(包括Queue 和Topic两种5.发:Session创建消息生产者MessageProducer(收:Session创建消息消费者MessageConsumer6.Seesion创建Message,(发:MessageProducer发送到Destination,(收: MessageConsumer从Destination接受消息。
1.3接口间的关系JMS规范定义了通用接口(JMS Common Interfaces、队列接口(PTP-specificInterfaces和主题接口(Pub/Sub-specific Interfaces,队列接口和主题接口分别继承于通用接口,具体关系如下表所示。
ActiveMQ对这些规范接口都有相应的实现。
在实际的编程过程中,声明通用接口基本就够用了。
如何区分Queue和Topic也很简单,参看下面的代码。
?1 2 3 4 5 //Queue,队列Destination destination = session.createQueue(subject;//Topic,主题Destination destination = session.createTopic(subject;2.通过队列发送和接受消息运行代码的时候,可以先run起来接受消息的程序,再run发送消息的程序,来观察消息发送的过程。
消息中间件系列⼀:⼊门、JMS规范、ActiveMQ使⽤⼀、⼊门1. 消息中间件的定义没有标准定义,⼀般认为,采⽤消息传送机制/消息队列的中间件技术,进⾏数据交流,⽤在分布式系统的集成2. 为什么要⽤消息中间件解决分布式系统之间消息的传递。
电商场景:⽤户下单减库存,调⽤物流系统。
随着业务量的增⼤,需要对系统进⾏拆分(服务化和业务拆分),拆分后的系统之间的交互⼀般⽤RPC(远程过程调⽤)。
如果系统扩充到有⼏⼗个接⼝,就需要⽤消息中间件来解决问题。
3. 消息中间件和RPC有什么区别3.1 功能特点:在架构上,RPC和Message的差异点:Message有⼀个中间结点Message Queue,可以把消息存储。
3.2 消息的特点Message Queue把请求的压⼒保存⼀下,逐渐释放出来,让处理者按照⾃⼰的节奏来处理。
Message Queue引⼊⼀下新的结点,让系统的可靠性会受Message Queue结点的影响。
Message Queue是异步单向的消息。
发送消息设计成是不需要等待消息处理的完成。
所以对于有同步返回需求,⽤Message Queue则变得⿇烦了。
3.3 PRC的特点同步调⽤,对于要等待返回结果/处理结果的场景,RPC是可以⾮常⾃然直觉的使⽤⽅式。
# RPC也可以是异步调⽤。
由于等待结果,Consumer(Client)会有线程消耗。
如果以异步RPC的⽅式使⽤,Consumer(Client)线程消耗可以去掉。
但不能做到像消息⼀样暂存消息/请求,压⼒会直接传导到服务Provider。
3.4 适⽤场合说明希望同步得到结果的场合,RPC合适。
希望使⽤简单,则RPC;RPC操作基于接⼝,使⽤简单,使⽤⽅式模拟本地调⽤。
异步的⽅式编程⽐较复杂。
不希望发送端(RPC Consumer、Message Sender)受限于处理端(RPC Provider、Message Receiver)的速度时,使⽤Message Queue。