两种简单Rabbitmq使用方案及其测试
- 格式:docx
- 大小:1.46 MB
- 文档页数:10
rabbitmq应用实例RabbitMQ是一个流行的开源消息中间件,它可以帮助我们简化分布式系统之间的通信,并提高系统的可靠性。
以下是RabbitMQ的应用实例:1. 分布式任务队列在一个大型的分布式系统中,任务通常会被分配到多个节点进行并行处理。
在这种情况下,我们可以使用RabbitMQ来实现一个分布式的任务队列。
例如,一个电商网站可能需要在每天的晚上对所有商品的价格进行重新计算。
这个任务可能需要数小时才能完成,因此我们可以将这个任务分解成许多小的子任务,并将它们分配给多个节点进行处理。
节点可以使用RabbitMQ从任务队列中获取需要处理的消息,并将处理结果发送回队列中。
2. 日志收集器在一个分布式的系统中,各个节点可能会生成大量的日志文件。
为了方便管理和分析这些日志文件,我们可以使用RabbitMQ来实现一个日志收集器。
例如,一个在线电商网站可能需要追踪用户在网站上的行为。
这个过程会产生大量的日志文件,这些日志文件可能存储在不同的节点上。
我们可以使用RabbitMQ来收集这些日志文件,将它们发送到一个中央的日志处理节点,然后进行统一的处理和分析。
3. 消息推送在一个在线系统中,我们经常需要向用户发送推送通知。
我们可以使用RabbitMQ来实现一个消息推送的系统。
例如,一个在线聊天应用程序可能需要将用户之间的消息发送到相应的用户。
通过使用RabbitMQ,我们可以将消息发送到一个中央的消息队列中,然后从队列中获取消息并将其发送给相应的用户。
4. 事件驱动架构在一个分布式的系统中,我们经常需要使用事件来触发系统内的各个流程。
我们可以使用RabbitMQ来实现一个事件驱动的架构。
例如,在一个电商网站中,当一个用户下单时,我们可以使用RabbitMQ来发布一个订单事件。
这个事件可以触发其他部分的系统响应,例如库存管理系统可以更新库存,财务系统可以生成账单等。
总之,RabbitMQ是一个功能强大的消息中间件,可以帮助我们有效地组织分布式系统之间的通信。
RabbitMQ使⽤教程(⼆)RabbitMQ⽤户管理,⾓⾊管理及权限设置上⼀篇博客中,我们成功的安装好了RabbitMQ环境,并通过⼀个Java客户端⽰例了解了⽤⽣产者来发布消息,⽤消费者来消费消息。
本篇博客主要讲解下RabbitMQ如何管理⽤户(新增/删除/修改密码),如何给⽤户设置⾓⾊,如何设置⽤户权限,接下来,我们⼀⼀讲解。
1. ⽤户管理1.1 查看⽤户列表RabbitMQ安装完成后,会有⼀个默认⽤户(guest guest),那么我们如何查看⽤户列表呢?第2种⽅式是通过命令查看:cd E:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15\sbinrabbitmqctl.bat list_users注意:第1个命令中的路径修改为你机器RabblitMQ的安装路径。
1.2 新建⽤户在实际的使⽤过程中,不可能只存在这1个⽤户,有时我们需要新建1个⽤户给到开发⼈员使⽤,⽐如新建个⽤户developer 123456。
仍然有2种⽅式,第1种⽅式是通过管理后台新建:第2种⽅式是通过命令新增:rabbitmqctl.bat add_user developer 123456细⼼的⽹友也许会问为啥⽤户guest的tags是[administrator],⽽新建的⽤户developer的tags是[]呢,别着急,这⼀点会在下⾯的2.⾓⾊设置中讲解。
1.3 删除⽤户在实际使⽤过程中,删除⽤户的场景肯定也是存在的,⽐如我想把刚刚新建的⽤户developer删除掉。
仍然有2种⽅式,第1种⽅式是通过管理后台新建:第2种⽅式是通过命令删除:rabbitmqctl.bat delete_user developer因为后⾯还要使⽤⽤户developer,删除完可以再新增回来。
1.4 修改密码可能有⼈会觉得123456这种密码,太简单了,不安全,我要修改成developer123456。
rabbitmq基本用法RabbitMQ是一个开源的消息队列系统,广泛应用于分布式系统中的消息传递和异步通信。
本文将介绍RabbitMQ的基本用法,包括安装部署、创建队列、发送和接收消息等操作。
1. 安装部署RabbitMQ的安装部署比较简单,可以在官网上下载相应的安装包,也可以使用包管理工具进行安装。
安装完成后,启动RabbitMQ 服务即可。
2. 创建队列在RabbitMQ中,消息通过队列进行传递和存储。
可以使用RabbitMQ的Web管理界面或者命令行工具创建队列。
例如,在命令行中输入以下命令就可以创建一个名为“hello”的队列:```sudo rabbitmqctl queue_declare queue=hello```3. 发送消息发送消息需要首先连接到RabbitMQ服务,然后向指定的队列发送消息。
可以使用RabbitMQ的Java客户端库或其他语言的客户端库进行开发。
以下是一个示例代码,向名为“hello”的队列发送一条消息:```Connection connection = factory.newConnection();Channel channel = connection.createChannel();String message = 'Hello RabbitMQ!';channel.basicPublish('', 'hello', null,message.getBytes());channel.close();connection.close();```4. 接收消息接收消息需要通过订阅指定的队列,然后从队列中取出消息进行处理。
可以使用RabbitMQ的Java客户端库或其他语言的客户端库进行开发。
以下是一个示例代码,从名为“hello”的队列接收消息: ```Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.basicConsume('hello', true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, 'UTF-8');System.out.println('Received: ' + message);}});```通过以上介绍,可以初步了解RabbitMQ的基本用法,包括安装部署、创建队列、发送和接收消息等操作。
rabbitmq 使用手册RabbitMQ 是一个开源的消息代理软件,它实现了高效的消息传递机制,可以在分布式系统中进行消息的发布和订阅。
下面是 RabbitMQ 的使用手册的详细精确说明:1. 安装 RabbitMQ:首先,你需要下载并安装 RabbitMQ。
你可以从RabbitMQ 官方网站上下载适合你操作系统的安装包,并按照官方文档中的说明进行安装。
2. 启动 RabbitMQ 服务器:安装完成后,你可以启动 RabbitMQ 服务器。
在大多数操作系统中,你可以通过命令行执行以下命令来启动RabbitMQ:```rabbitmq-server```如果一切正常,你将会看到 RabbitMQ 服务器成功启动的日志信息。
3. 创建和管理队列:RabbitMQ 使用队列来存储消息。
你可以使用 RabbitMQ 的管理界面或者命令行工具来创建和管理队列。
以下是一些常用的队列操作命令:- 创建队列:```rabbitmqadmin declare queue name=<queue_name>```- 查看队列列表:```rabbitmqadmin list queues```- 删除队列:```rabbitmqadmin delete queue name=<queue_name>```4. 发布和消费消息:使用 RabbitMQ,你可以将消息发布到队列中,并从队列中消费消息。
以下是一些常用的消息操作命令:- 发布消息:```rabbitmqadmin publish routing_key=<routing_key>payload=<message>```- 消费消息:```rabbitmqadmin get queue=<queue_name>```- 确认消息已被消费:```rabbitmqadmin ack delivery_tag=<delivery_tag> ```5. 设置消息交换机和绑定:RabbitMQ 使用消息交换机来将消息路由到队列。
rabbitmq项目使用实例RabbitMQ是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP)标准,可以在分布式系统中扮演消息代理的角色,用于在应用程序之间传递消息。
下面我将从多个角度介绍RabbitMQ项目的使用实例。
1. 安装和配置:首先,你需要安装RabbitMQ服务器。
安装完成后,你可以通过RabbitMQ的管理界面进行一些基本的配置,比如创建虚拟主机、用户和权限等。
你也可以通过命令行工具进行配置,比如创建队列、交换机,绑定队列等操作。
2. 生产者和消费者:在一个典型的RabbitMQ项目中,你通常会有生产者和消费者两种角色。
生产者负责向RabbitMQ发送消息,而消费者则从RabbitMQ接收消息并进行处理。
你需要编写相应的代码来实现这些功能,比如使用RabbitMQ的客户端库来连接到RabbitMQ服务器,并发送/接收消息。
3. 消息队列的使用:RabbitMQ的核心功能之一就是消息队列。
你可以创建队列,并将消息发送到队列中。
消费者可以订阅队列,并在有消息到达时进行处理。
这种模式非常适合解耦生产者和消费者,以及实现异步处理。
4. 交换机和路由:RabbitMQ中的交换机用于接收来自生产者的消息,并将消息路由到一个或多个队列。
你可以根据消息的路由键(routing key)来指定消息的路由规则,以及选择不同的路由算法,比如直接交换、主题交换等。
5. 错误处理和可靠性:在实际项目中,你需要考虑消息的可靠性和错误处理。
RabbitMQ提供了一些机制来处理这些情况,比如确认机制(acknowledgment)和消息持久化。
你可以确保消息在发送和接收过程中不会丢失,并且可以处理一些异常情况。
总的来说,RabbitMQ在实际项目中可以应用于很多场景,比如微服务架构中的服务间通信、异步任务处理、日志收集等。
通过合理的配置和代码编写,你可以充分利用RabbitMQ的功能,实现高效的消息传递和处理。
一、RabbitMQ性能测试简介RabbitMQ是一个高性能、高可靠且易于部署的消息队列中间件。
它可以用于构建分布式系统,实现不同系统之间的消息传递与通信。
在实际应用场景中,对于RabbitMQ的性能表现往往是关注的焦点之一。
为了全面了解RabbitMQ的性能,进行性能测试是非常必要的。
二、RabbitMQ性能测试参数概述RabbitMQ性能测试需要关注的参数包括以下几个方面:1. 消息大小:消息的大小会直接影响RabbitMQ的性能。
通常来说,较小的消息会比较大的消息具有更好的性能表现。
2. 消息持久化:消息是否需要持久化也是一个重要的测试参数。
如果消息需要被持久化,会增加对磁盘的I/O负载,从而影响性能。
3. 消息确认模式:RabbitMQ支持多种消息确认模式,包括ack模式和noack模式等。
不同的消息确认模式对性能的影响是不同的。
4. 并发连接数:并发连接数是指同时与RabbitMQ建立连接的客户端数量。
并发连接数的增加会对RabbitMQ的性能产生一定的影响。
5. 持久化队列和交换机:通过测试持久化队列和交换机的性能,可以评估RabbitMQ在处理持久化消息时的表现。
6. 高可用和镜像队列:测试在高可用集裙环境下RabbitMQ的性能表现,以及镜像队列对性能的影响。
7. 客户端消息确认超时时间:设置客户端消息确认的超时时间,测试在不同的超时时间下RabbitMQ的性能表现。
8. 消息发布速率:测试消息发布的速率,评估RabbitMQ在高负载情况下的性能表现。
9. 用户数量:测试不同数量的用户同时消费消息时RabbitMQ的性能。
10. TCP缓冲区大小:设置TCP缓冲区的大小,测试不同大小的TCP缓冲区对RabbitMQ性能的影响。
11. 用户预取消息数量:设置用户预取消息的数量,测试不同预取消息数量对RabbitMQ性能的影响。
12. 交换机类型:测试不同类型的交换机对RabbitMQ的性能影响。
rabbitmq应用实例RabbitMQ应用实例RabbitMQ是一种开源的消息代理软件,广泛应用于分布式系统中,用于在不同应用程序之间传递消息。
它采用AMQP协议,提供了可靠的消息传递机制,能够确保消息的可靠性和顺序性。
下面我们来看几个RabbitMQ的应用实例。
1. 订单处理系统假设有一个电商网站,用户下单后需要进行订单处理。
在这个过程中,需要将订单信息传递给库存系统、支付系统和物流系统等。
这时就可以利用RabbitMQ来实现不同系统之间的消息传递。
当用户下单时,订单系统将订单信息发送到RabbitMQ,其他系统订阅相应的消息队列,从而实现订单信息的同步处理。
2. 日志收集系统在一个分布式系统中,各个节点会产生大量的日志信息。
为了方便管理和分析这些日志,可以使用RabbitMQ来搭建日志收集系统。
每个节点将日志信息发送到RabbitMQ的消息队列中,然后日志收集服务订阅这些消息队列,将日志信息汇总到中心服务器进行存储和分析。
3. 实时数据处理系统在一些实时数据处理场景中,比如金融交易系统、在线游戏等,需要对数据进行实时处理和分发。
RabbitMQ可以作为数据流处理的中间件,将数据发送到不同的处理节点进行处理。
通过RabbitMQ的消息队列机制,可以实现数据的实时传输和处理,确保系统的高可用性和可靠性。
4. 任务调度系统在一些任务调度场景中,比如定时任务、异步任务等,可以使用RabbitMQ来实现任务的调度和执行。
任务调度系统将任务信息发送到RabbitMQ的消息队列中,工作节点订阅消息队列并执行相应的任务。
通过RabbitMQ的消息确认机制,可以确保任务的可靠执行,避免任务丢失或重复执行的情况。
总结通过以上几个应用实例,我们可以看到RabbitMQ在分布式系统中的重要作用。
它不仅可以实现不同系统之间的消息传递,还可以提高系统的可靠性和可扩展性。
因此,在设计分布式系统时,可以考虑使用RabbitMQ来解决消息传递的问题,提升系统的性能和稳定性。
rabbitmq简单⽰例(HelloWorld)⼀:消息中间件:AMQP,即Advanced Message Queuing Protocol,⾼级消息队列协议,是应⽤层协议的⼀个开放标准,为⾯向消息的中间件设计RabbitMQ是实现AMQP(⾼级消息队列协议)的消息中间件的⼀种常见的消息中间件:Kafka、RabbitMQ、RocketMQ等⼆:rabbit mq消息中间件:1.图:P:代表⽣产者,C:代表消费者,中间红⾊的是消息队列。
注:关于消息队列的好处可以去官⽹了解,可以参考数据结构中队列的特点理解。
2.HelloWorld⽰例的代码: 该简单⽰例的代码在rabbitmq官⽹也有实现:/tutorials/tutorial-one-java.html发送者:import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class Send {// private final static String QUEUE_NAME = "task_event_queue";// private final static String QUEUE_NAME = "task-queue-default";private final static String QUEUE_NAME = "task-queue-default_mytest 2018-3-15";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();/** queueDeclare⽅法参数:* 第⼀个参数表⽰队列名称、第⼆个参数为是否持久化(true表⽰是,服务器重启仍存在此队列)、第三个参数为是否是独占队列(创建者可以使⽤的私有队列,断开后⾃动删除)、 * 第四个参数为当所有消费者客户端连接断开时是否⾃动删除队列、第五个参数为队列的其他参数*/channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 注:第⼆个参数要设置为true,之前设置为false,⽼是报连接超时String message = "Hello World! 2018-3-15 14:14:06";/** basicPublish⽅法参数:* 第⼀个参数为交换机名称、第⼆个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); // 发送消息,使⽤默认的direct交换器System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}}消费者:import java.io.IOException;import com.rabbitmq.client.*;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;public class Receive {private final static String QUEUE_NAME = "task-queue-default_mytest 2018-3-15";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// channel.queueDeclare(QUEUE_NAME, true, false, false, null);channel.queueDeclare(QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");/** 告诉服务器我们需要那个管道的消息,如果管道中有消息,就会执⾏回调函数handleDelivery*/Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println(" [Consumer] Received '" + message + "'");}};/** basicConsume⽅法参数:* 第⼀个参数是Consumer绑定的队列名* 第⼆个参数是⾃动确认标志,如果为true,表⽰Consumer接受到消息后,会⾃动发确认消息(Ack消息)给消息队列,消息队列会将这条消息从消息队列⾥删除 * 第三个参数就是Consumer对象,⽤于处理接收到的消息*/channel.basicConsume(QUEUE_NAME, true, consumer);channel.close();factory.clone();}}。
rabbitmq五种模式详解(含实现代码)⼀、五种模式详解1.简单模式(Queue模式)当⽣产端发送消息到交换机,交换机根据消息属性发送到队列,消费者监听绑定队列实现消息的接收和消费逻辑编写.简单模式下,强调的⼀个队列queue只被⼀个消费者监听消费.1.1 结构⽣产者:⽣成消息,发送到交换机交换机:根据消息属性,将消息发送给队列消费者:监听这个队列,发现消息后,获取消息执⾏消费逻辑1.2应⽤场景常见的应⽤场景就是⼀发,⼀接的结构例如:⼿机短信邮件单发2.争抢模式(Work模式)强调的也是后端队列与消费者绑定的结构2.1结构⽣产者:发送消息到交换机交换机:根据消息属性将消息发送给队列消费者:多个消费者,同时绑定监听⼀个队列,之间形成了争抢消息的效果2.2应⽤场景抢红包资源分配系统3.路由模式(Route模式 Direct定向)从路由模式开始,关⼼的就是消息如何到达的队列,路由模式需要使⽤的交换机类型就是路由交换机(direct)3.1 结构⽣产端:发送消息,在消息中处理消息内容,携带⼀个routingkey交换机:接收消息,根据消息的routingkey去计算匹配后端队列的routingkey队列:存储交换机发送的消息消费端:简单模式⼯作争抢3.2应⽤场景短信聊天⼯具邮箱。
⼿机号/邮箱地址,都可以是路由key4.发布订阅模式(Pulish/Subscribe模式 Fanout⼴播)不计算路由的⼀种特殊交换机4.1结构4.2应⽤场景消息推送⼴告5.主题模式(Topics模式 Tpoic通配符)路由key值是⼀种多级路径。
中国.四川.成都.武侯区5.1结构⽣产端:携带路由key,发送消息到交换机队列:绑定交换机和路由不⼀样,不是⼀个具体的路由key,⽽可以使⽤*和#代替⼀个范围| * | 字符串,只能表⽰⼀级 || --- | --- || # | 多级字符串 |交换机:根据匹配规则,将路由key对应发送到队列消息路由key:北京市.朝阳区.酒仙桥北京市.#: 匹配true上海市.浦东区.*: 没匹配false新疆.乌鲁⽊齐.#5.2 应⽤场景做物流分拣的多级传递.6.完整结构⼆、代码实现1.创建SpringBoot⼯程1.1 ⼯程基本信息1.2 依赖信息1.3 配置⽂件applicasion.properties# 应⽤名称=springboot-demo# Actuator Web 访问端⼝management.server.port=8801management.endpoints.jmx.exposure.include=*management.endpoints.web.exposure.include=*management.endpoint.health.show-details=always# 应⽤服务 WEB 访问端⼝server.port=8801######################### RabbitMQ配置 ######################### RabbitMQ主机spring.rabbitmq.host=127.0.0.1# RabbitMQ虚拟主机spring.rabbitmq.virtual-host=demo# RabbitMQ服务端⼝spring.rabbitmq.port=5672# RabbitMQ服务⽤户名ername=admin# RabbitMQ服务密码spring.rabbitmq.password=admin# RabbitMQ服务发布确认属性配置## NONE值是禁⽤发布确认模式,是默认值## CORRELATED值是发布消息成功到交换器后会触发回调⽅法## SIMPLE值经测试有两种效果,其⼀效果和CORRELATED值⼀样会触发回调⽅法,其⼆在发布消息成功后使⽤rabbitTemplate调⽤waitForConfirms或waitForConfirmsOrDie⽅法等待broker节点返回发送结果,根据返回结果来判定下⼀步的逻辑,要注意的点是wa spring.rabbitmq.publisher-confirm-type=simple# RabbitMQ服务开启消息发送确认spring.rabbitmq.publisher-returns=true######################### simple模式配置 ######################### RabbitMQ服务消息接收确认模式## NONE:不确认## AUTO:⾃动确认## MANUAL:⼿动确认spring.rabbitmq.listener.simple.acknowledge-mode=manual# 指定最⼩的消费者数量spring.rabbitmq.listener.simple.concurrency=1# 指定最⼤的消费者数量spring.rabbitmq.listener.simple.max-concurrency=1# 开启⽀持重试spring.rabbitmq.listener.simple.retry.enabled=true2.简单模式2.1 创建SimpleQueueConfig 简单队列配置类package com.gmtgo.demo.simple;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** @author ⼤帅*/@Configurationpublic class SimpleQueueConfig {/*** 定义简单队列名.*/private final String simpleQueue = "queue_simple";@Beanpublic Queue simpleQueue() {return new Queue(simpleQueue);}}2.2 编写⽣产者package com.gmtgo.demo.simple;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import ponent;/*** @author ⼤帅*/@Slf4j@Componentpublic class SimpleProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage() {for (int i = 0; i < 5; i++) {String message = "简单消息" + i;("我是⽣产信息:{}", message);rabbitTemplate.convertAndSend( "queue_simple", message);}}}2.3 编写消费者package com.gmtgo.demo.simple;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import ponent;import java.io.IOException;/*** @author ⼤帅*/@Slf4j@Componentpublic class SimpleConsumers {@RabbitListener(queues = "queue_simple")public void readMessage(Message message, Channel channel) throws IOException {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);("我是消费信息:{}", new String(message.getBody()));}}2.4 编写访问类package com.gmtgo.demo.simple;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/*** @author ⼤帅*/@RestController@RequestMapping(value = "/rabbitMq")public class SimpleRabbitMqController {@Autowiredprivate SimpleProducer simpleProducer;@RequestMapping(value = "/simpleQueueTest")public String simpleQueueTest() {simpleProducer.sendMessage();return "success";}}2.5 测试启动项⽬访问 simpleQueueTest访问地址:结果:3.Work队列3.1 编写⼯作配置package com.gmtgo.demo.work;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** @author ⼤帅*/@Configurationpublic class WorkQueueConfig {/*** 队列名.*/private final String work = "work_queue";@Beanpublic Queue workQueue() {return new Queue(work);}}3.2 编写⽣产者package com.gmtgo.demo.work;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import ponent;/*** @author ⼤帅*/@Slf4j@Componentpublic class WorkProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage() {for (int i = 0; i < 10; i++) {String message = "⼯作消息" + i;("我是⽣产信息:{}", message);rabbitTemplate.convertAndSend("work_queue", message);}}}3.3 编写消费者1package com.gmtgo.demo.work;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import ponent;import java.io.IOException;/*** @author ⼤帅*/@Slf4j@Componentpublic class WorkConsumers1 {@RabbitListener(queues = "work_queue")public void readMessage(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);("我是消费信息1:{}", new String(message.getBody()));}}3.4 编写消费者2package com.gmtgo.demo.work;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import ponent;import java.io.IOException;/*** @author ⼤帅*/@Slf4j@Componentpublic class WorkConsumers2 {@RabbitListener(queues = "work_queue")public void readMessage(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);("我是消费信息2:{}", new String(message.getBody()));}}3.5 编写测试⽅法package com.gmtgo.demo.work;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/*** @author ⼤帅*/@RestController@RequestMapping(value = "rabbitMq")public class WorkRabbitMqController {@Autowiredprivate WorkProducer workProducer;@RequestMapping(value = "workQueueTest")public String workQueueTest() {workProducer.sendMessage();return "success";}}3.6 测试启动项⽬访问 workQueueTest访问地址结果:控制台打印,发现10条消息 偶数条消费者1获取,奇数条消费者2获取,并且平均分配。
Rabbitmq消息队列(⼆)HelloWorld!模拟简单发送接收1、简介 RabbitMQ是消息代理:它接受和转发消息。
你可以把它当作⼀个邮局:当你把你要邮寄的邮件放在信箱⾥时,你可以肯定Postman先⽣最终会把邮件送到你的收件⼈那⾥。
在这个⽐喻中,RabbitMQ是邮局,邮局邮差。
RabbitMQ和邮局之间的主要区别是,它不处理纸张,⽽是接受、存储和转发⼆进制数据‒消息。
整体的设计如下图: java环境下,使⽤rabbitmq需要⽤到jar包:amqp-client.jar2、发送消息 想要将消息发送到队列中,我们需要进⾏下⾯⼏步: (1)建⽴服务链接1 ConnectionFactory factory = new ConnectionFactory();2 factory.setHost("localhost"); // 设置ip地址3 factory.setPort(5672); // rabbit的端⼝号,默认为5672,可以不写4 factory.setPassword("rabbit"); // 发送消息⽤户的登录密码5 factory.setUsername("rabbit"); // 发送消息⽤户的登录⽤户名6 Connection connection = factory.newConnection(); // 新建链接7 Channel channel = connection.createChannel(); // 创建channel (2)声明消息队列1 channel.queueDeclare("QUEUE_NAME", false, false, false, null); (3)调⽤⽅法发送消息1 String message = "Hello World!"; // 待发送的消息2 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); (4)发送消息过后,需要关闭链接1 channel.close();2 connection.close();3、接收消息 接下来,当⽣产者将消息发送到队列之后,我们消费者⼀⽅就可以从队列中接收到消息了: (1)建⽴服务器链接,和⽣产者⼀⽅⼀样1 ConnectionFactory factory = new ConnectionFactory();2 factory.setHost("localhost"); // 设置ip地址3 factory.setPort(5672); // rabbit的端⼝号,默认为5672,可以不写4 factory.setPassword("rabbit"); // 发送消息⽤户的登录密码5 factory.setUsername("rabbit"); // 发送消息⽤户的登录⽤户名6 Connection connection = factory.newConnection(); // 新建链接7 Channel channel = connection.createChannel(); // 创建channel (2)声明消息队列1 channel.queueDeclare("QUEUE_NAME", false, false, false, null); (3)声明回调⽅法,接收服务器从队列中发过来的消息1 Consumer consumer = new DefaultConsumer(channel) {2 @Override3public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {6 String message = new String(body, "UTF-8");7 System.out.println(" [x] Received '" + message + "'");8 }9 };10 channel.basicConsume(QUEUE_NAME, true, consumer);4、运⾏查看结果 分别运⾏⽣产者⽅和消费者⽅的代码过后,通过⽣产者⽅发送消息过后,就可以在消费者⽅接收到发送过来的数据了。
两种简单Rabbitmq使用方案及其测试
方案一简单负载均衡方案
问题:
rabbitmq可以为Consumers做负载均衡,但rabbimq自身并没有负载均衡。用户连接到rabbitmq集
群的任意节点都可以访问集群中的任意消息队列,但一个消息队列只存储在一个物理节点上,其它节
点只存储该队列的元数据,这使得当队列里只有一个队列时,系统性能受限于单个节点的网络带宽和
主机性能。若使用多个队列来提升性能,也会有新的问题,即如何在队列之间做负载均衡,同时网络
连接也会影响系统性能,比如当一个用户往某个消息队列发消息时,而该用户当前连接的节点不是该
队列真实所在的物理节点,这必然会产生rabbitmq节点间通讯,从而消耗的一部分网络带宽。
方案:
为了解决以上问题,有以下方案(发送端做负载均衡,随机发送集群中任意节点),
1.建立多个消息队列,每个物理节点上消息队列数相同。
2.exchange的类型设置为direct,建立多个binding,每个队列对应一个key。
3.每个publisher建立到每个物理节点的连接。
4.每个worker订阅所有消息队列,。
5.发送消息时随机选择一个key,并使用该key对应的队列所有在节点的连接发送该消息。
6.当某个mq节点挂掉后,发送者将消息随机发送到其余节点,并一直监控该挂掉的节点是否重
起,重启后,即可向该节点发消息。
示意图如下
P
P
P
CCC
RabbitMQRabbitMQRabbitMQ
测试:
1.测试环境
硬件环境:
发送者(my031090, rds064071,rds064072)
rabbit节点(rds064073, rds064075,rds064074)
接收者(rds064076,rds064077,my031091)
软件环境:
内核2.6.32-220.el6.x86_64
rabbitmq: 2.8.1
erlang:R16B
rabbit-client: rabbit-erlang-client
网络环境:
≈117MB/s
2.测试结果
(1)包大小:1byte
集群整体每秒传输包个数:
每个连接传输速率
各队列数据包收发速率:
(2)包大小:256k
集群整体每秒传输包个数:
各连接传输速率:
队列收发速率:
3.结论
从上述测试结果可以看出,该方案基本实习了Rabbitmq的负载均衡,在数据包大小为256k时网
络吞吐量(250MB/s)也比较理想。
方案二高可用方案
问题:
Rabbitmq现提供队列mirror功能,通过这一功能可以提高Rabbitmq的可靠性,当某个Rabbitmq
节点故障时,只要其它节点里存在该故障节点的队列镜像,该队列就能继续正常工作不会丢失数
据。但使用该功能也会有些副作用,它这种通过冗余数据保障可靠性的方式会降低系统的性能,
因为往一个队列发数据也就会往这个队列的所有镜像队列发数据,这必然产生大量Rabbitmq节点
间数据的交互,降低吞吐率,镜像越多性能必然下降越多。与此同时,为充分利用集群的的资源,
需要创建多个队列,若在所有节点上都有每个队列的镜像来实现可靠性,则队列镜像数会太多,
过多的RabbitMq集群内部网络通讯会吃掉大量网络带宽。
方案:
为解决上述问题,我们实现一个允许挂一个节点的方案,该方案在方案一的基础上加上以下2条:
1. 每个队列只有一个镜像,镜像的位置为“下一个节点”,节点的分布如下图
P
P
P
CCC
RabbitMQRabbitMQRabbitMQ
Q2Q1Q3
Q3Q2
Q1
2. 消费者端监控所有链接,当发现某个节点挂掉时,自动连接到镜像节点,而当故障节点恢
复时自动连接回来。
测试:
测试环境:与测试一相同
测试结果:
(1) 包大小1byte:
集群每秒处理包数:
各连接传输速率:
各队列数据包收发速率:
(2) 包大小(256K)
集群每秒处理包数:
各连接传输速率:
各队列数据包收发速率:
结论:
与方案一的测试结果做比较可以看出,开启mirror后吞吐量大大降低,只有不到原来的1/4,