RabbitMQ路由选择
- 格式:pdf
- 大小:700.24 KB
- 文档页数:7
RabbitMQ中exchange、route、queue的关系从AMQP协议可以看出,MessageQueue、Exchange和Binding构成了AMQP协议的核⼼,下⾯我们就围绕这三个主要组件从应⽤使⽤的⾓度全⾯的介绍如何利⽤Rabbit MQ构建消息队列以及使⽤过程中的注意事项。
1. 声明MessageQueue在Rabbit MQ中,⽆论是⽣产者发送消息还是消费者接受消息,都⾸先需要声明⼀个MessageQueue。
这就存在⼀个问题,是⽣产者声明还是消费者声明呢?要解决这个问题,⾸先需要明确:a)消费者是⽆法订阅或者获取不存在的MessageQueue中信息。
b)消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。
在明⽩了上述两点以后,就容易理解如果是消费者去声明Queue,就有可能会出现在声明Queue之前,⽣产者已发送的消息被丢弃的隐患。
如果应⽤能够通过消息重发的机制允许消息丢失,则使⽤此⽅案没有任何问题。
但是如果不能接受该⽅案,这就需要⽆论是⽣产者还是消费者,在发送或者接受消息前,都需要去尝试建⽴消息队列。
这⾥有⼀点需要明确,如果客户端尝试建⽴⼀个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建⽴成功的。
如果⼀个消费者在⼀个信道中正在监听某⼀个队列的消息,Rabbit MQ是不允许该消费者在同⼀个channel去声明其他队列的。
Rabbit MQ中,可以通过queue.declare命令声明⼀个队列,可以设置该队列以下属性:a) Exclusive:排他队列,如果⼀个队列被声明为排他队列,该队列仅对⾸次声明它的连接可见,并在连接断开时⾃动删除。
这⾥需要注意三点:其⼀,排他队列是基于连接可见的,同⼀连接的不同信道是可以同时访问同⼀个连接创建的排他队列的。
其⼆,“⾸次”,如果⼀个连接已经声明了⼀个排他队列,其他连接是不允许建⽴同名的排他队列的,这个与普通队列不同。
RabbitMq之路由键模糊匹配前⾔路由键模糊匹配就是可以使⽤正则表达式,和常⽤的正则表⽰式不同,这⾥的话“#”表⽰所有、全部的意思;“*”只匹配到⼀个词。
看完⽰例就能明⽩了。
实例的功能⼤概是这样:⽐如你有个知⼼好朋友,不管开⼼、伤⼼、⼯作上的还是⽣活上的事情都可以和她说;还有⼀些朋友可以分享开⼼的事情;还有⼀些朋友,你可以把不开⼼的事情和她说。
路由键routings = [ 'happy.work', 'happy.life' , 'happy.work.teacher', 'sad.work', 'sad.life', 'sad.work.teacher' ]"#":匹配所有的路由键"happy.#":匹配 'happy.work', 'happy.life' , 'happy.work.teacher'"work.#":⽆匹配“happy.*”:匹配 'happy.work', 'happy.life'"*.work":匹配 'happy.work', 'sad.work'"*.work.#":匹配 'happy.work', 'happy.work.teacher', 'sad.work', 'sad.work.teacher'代码发送端import pikahostname = '127.0.0.1'parameters = pika.ConnectionParameters(hostname)connection = pika.BlockingConnection(parameters)# 创建通道channel = connection.channel()# 定义交换机,设置类型为topicchannel.exchange_declare(exchange='test_queue_topic', exchange_type='topic')# 定义三个路由键routings = ['happy.work', 'happy.life', 'sad.work', 'sad.life']# 将消息依次发送到交换机,并设置路由键for routing in routings:message = '%s message.' % routingchannel.basic_publish(exchange='test_queue_topic', routing_key=routing, body=message)print(message)connection.close()接受端import sys, pikahostname = '127.0.0.1'parameters = pika.ConnectionParameters(hostname)connection = pika.BlockingConnection(parameters)# 创建通道channel = connection.channel()# 定义交换机,设置类型为topicchannel.exchange_declare(exchange='test_queue_topic', exchange_type='topic')# 从命令⾏获取路由键参数,如果没有,则设置为inforoutings = sys.argv[1:]if not routings:routings = ['info']# ⽣成临时队列,result = channel.queue_declare(queue='test_queue_topic', exclusive=True) # exclusive=True 当接收端退出的时候会销毁那个临时创建的队列queue_name = result.method.queuefor routing in routings:# 绑定到交换机上,设置路由键channel.queue_bind(exchange='test_queue_topic', queue=queue_name, routing_key=routing) def callback(ch, method, properties, body):print(" [x] Received %r" % (body,))channel.basic_consume(queue='test_queue_topic', # 指定队列名on_message_callback=callback, # 从队列⾥获取消息auto_ack=False # mq服务器挂掉防⽌任务丢失)print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()代码测试发送端接受端workhappy所有。
计算机专业面试题计算机专业面试题(篇1)1、什么是ActiveMQ?activeMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。
2、Activemq的瓶颈值根据网上一般评测文档上来看,每秒的消息吞吐在20__以上,acticemq也可以集群化部署,也是使用zookeeper来搭建。
3、ActiveMQ服务器宕机怎么办?这得从ActiveMQ的储存机制说起。
在通常的情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的,它们的最大限制在配置文件的节点中配置。
但是,在非持久化消息堆积到一定程度,内存告急的时候,ActiveMQ会将内存中的非持久化消息写入临时文件中,以腾出内存。
虽然都保存到了文件里,但它和持久化消息的区别是,重启后持久化消息会从文件中恢复,非持久化的临时文件会直接删除。
那如果文件增大到达了配置中的最大限制的时候会发生什么?我做了以下实验:设置2G左右的持久化文件限制,大量生产持久化消息直到文件达到最大限制,此时生产者阻塞,但消费者可正常连接并消费消息,等消息消费掉一部分,文件删除又腾出空间之后,生产者又可继续发送消息,服务自动恢复正常。
设置2G左右的临时文件限制,大量生产非持久化消息并写入临时文件,在达到最大限制时,生产者阻塞,消费者可正常连接但不能消费消息,或者原本慢速消费的消费者,消费突然停止。
整个系统可连接,但是无法提供服务,就这样挂了。
具体原因不详,解决方案:尽量不要用非持久化消息,非要用的话,将临时文件限制尽可能的调大。
4、AcitveMQ的作用、原理?(生产者、消费者、p2p、订阅实现流程)Activemq的作用就是系统之间进行通信。
当然可以使用其他方式进行系统间通信,如果使用Activemq的话可以对系统之间的调用进行解耦,实现系统间的异步通信。
原理就是生产者生产消息,把消息发送给activemq。
rabbitmq的常见用法
RabbitMQ是一种开源的消息队列系统,它使用AMQP(高级消息队列协议)作为标准,可以实现消息的可靠传递、解耦和灵活性。
以下是RabbitMQ的一些常见用法:
1. 消息队列:RabbitMQ可以作为消息队列使用,发送方将消息发送到队列中,接收方从队列中获取并处理消息。
这种用法可以实现异步通信和分布式系统中的解耦。
2. 发布/订阅模式:RabbitMQ支持发布/订阅模式,发送方将消息发布到主题或交换机,多个接收方可以订阅该主题或交换机,以获取并处理消息。
这种模式可以实现一对多通信,适用于广播、日志收集等场景。
3. 路由模式:RabbitMQ还支持路由模式,发送方通过特定的路由键将消息发送到交换机,然后交换机将消息路由到一个或多个队列。
接收方可以从对应的队列中获取并处理消息。
这种模式可以实现灵活的路由控制和优先级设置。
4. 工作队列:RabbitMQ可以作为工作队列使用,任务的生产者将任务发布到队列中,多个任务消费者从队列中获取任务并执行。
这种用法可以实现任务的分布式处理和负载均衡。
5. 消息确认机制:RabbitMQ支持消息确认机制,接收方在处理完消息后可以向队列发送确认消息,以确保消息被成功处理。
这种机制可以提高消息处理的可靠性和稳定性。
6. 死信队列和重试机制:RabbitMQ还支持死信队列和重试机制,当接收方在处理消息时发生异常或失败,可以将消息发送到死信队列或进行重试。
这种机制可以提高系统的鲁棒性和容错能力。
总之,RabbitMQ作为一种灵活的消息队列系统,提供了多种用法和功能,适用于各种场景和需求。
Rabbitmqchannel参数详解1、Channel1.1 channel.exchangeDeclare():type:有direct、fanout、topic三种:fanoutfanout类型的Exchange路由规则⾮常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中directdirect类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
topic规则就是模糊匹配,可以通过通配符满⾜⼀部分规则就可以传送。
它的约定是:routing key为⼀个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每⼀段独⽴的字符串称为⼀个单词),如“d.nyse”、“nyse.vmw”、“quick.orange.rabbit” binding key与routing key⼀样也是句点号“. ”分隔的字符串。
binding key中可以存在两种特殊字符“”与“#”,⽤于做模糊匹配,其中“”⽤于匹配⼀个单词,“#”⽤于匹配多个单词(可以是零个)durable:true、false true:服务器重启会保留下来Exchange。
警告:仅设置此选项,不代表消息持久化。
即不保证重启后消息还在。
原⽂:true if we are declaring a durable exchange (the exchange will survive a server restart)autoDelete:true、false.true:当已经没有消费者时,服务器是否可以删除该Exchange。
原⽂1:true if the server should delete the exchange when it is no longer in use。
/*** Declare an exchange.* @see com.rabbitmq.client.AMQP.Exchange.Declare* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk* @param exchange the name of the exchange* @param type the exchange type* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)* @param autoDelete true if the server should delete the exchange when it is no longer in use* @param arguments other properties (construction arguments) for the exchange* @return a declaration-confirm method to indicate the exchange was successfully declared* @throws java.io.IOException if an error is encountered*/Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Map<String, Object> arguments) throws IOException;1.2 chanel.basicQos()prefetchSize:0prefetchCount:会告诉RabbitMQ不要同时给⼀个消费者推送多于N个消息,即⼀旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack global:true\false 是否将上⾯设置应⽤于channel,简单点说,就是上⾯限制是channel级别的还是consumer级别备注:据说prefetchSize 和global这两项,rabbitmq没有实现,暂且不研究 QoS = quality-of-service,顾名思义,服务的质量。
RabbitMQ的工作队列和路由工作队列:Working Queue工作队列这个概念与简单的发送/接收消息的区别就是:接收方接收到消息后,可能需要花费更长的时间来处理消息,这个过程就叫一个Work/Task。
几个概念分配:多个接收端接收同一个Queue时,如何分配?消息确认:Server端如何确定接收方的Work已经对消息进行了完整的处理?消息持久化:发送方、服务端Queue如何对未处理的消息进行磁盘持久化?Round-robin分配多个接收端接收同一个Queue时,采用了Round-robin分配算法,即轮叫调度——依次分配给各个接收方。
消息确认默认开启了消息确认(接收方接收到消息后,立即向服务器发回确认)。
消息接收方处理完消息后,向服务器发送消息确认,服务器再删除该消息。
对于耗时的work,可以先关闭自动消息确认,在work完成后,再手动发回确认。
channel.basicConsume("hello",false/*关闭自动消息确认*/,consumer);// ...work完成后channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);持久化1. Server端的Queue持久化注意的是,如果已经声明了同名非持久化的Queue,则再次声明无效。
发送方和接收方都需要指定该参数。
boolean durable = true;channel.queueDeclare("task_queue", durable, false, false, null);2. Message持久化channel.basicPublish("","task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());负载分配为了解决各个接收端工作量相差太大的问题(有的一直busy,有的空闲比较多),突破Round-robin。
RabbitMQ 七种队列模式应用场景案例分析(通俗易懂)七种模式介绍与应用场景简单模式(Hello World)做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B应用场景:将发送的电子邮件放到消息队列,然后邮件服务在队列中获取邮件并发送给收件人工作队列模式(Work queues)在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者,一般适用于执行资源密集型任务,单个消费者处理不过来,需要多个消费者进行处理应用场景:一个订单的处理需要10s,有多个订单可以同时放到消息队列,然后让多个消费者同时处理,这样就是并行了,而不是单个消费者的串行情况订阅模式(Publish/Subscribe)一次向许多消费者发送消息,一个生产者发送的消息会被多个消费者获取,也就是将消息将广播到所有的消费者中。
应用场景:更新商品库存后需要通知多个缓存和多个数据库,这里的结构应该是:•一个fanout类型交换机扇出两个个消息队列,分别为缓存消息队列、数据库消息队列•一个缓存消息队列对应着多个缓存消费者•一个数据库消息队列对应着多个数据库消费者路由模式(Routing)有选择地(Routing key)接收消息,发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key,仅消费指定路由key的消息应用场景:如在商品库存中增加了1台iphone12,iphone12促销活动消费者指定routing key为iphone12,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此routing key的消息主题模式(Topics)根据主题(Topics)来接收消息,将路由key和某模式进行匹配,此时队列需要绑定在一个模式上,#匹配一个词或多个词,*只匹配一个词。
应用场景:同上,iphone促销活动可以接收主题为iphone的消息,如iphone12、iphone13等远程过程调用(RPC)如果我们需要在远程计算机上运行功能并等待结果就可以使用RPC,具体流程可以看图。
rabbitmq使用手册RabbitMQ是一种开源的消息队列中间件,采用AMQP协议,被广泛应用于构建可靠、高效的分布式系统。
本手册将详细介绍RabbitMQ 的安装、配置、使用和常见问题解决方案,帮助读者快速上手使用RabbitMQ。
第一章安装与配置1.1 环境准备在开始安装RabbitMQ之前,需要确保系统满足以下要求:操作系统(例如Linux、Windows)、Erlang运行时环境以及RabbitMQ软件包。
1.2 安装RabbitMQ按照文档提供的方式,在所选的操作系统上安装RabbitMQ。
安装过程中需注意版本兼容性和安全配置。
1.3 配置RabbitMQ在安装完成后,需要对RabbitMQ进行适当的配置。
主要包括网络配置、认证与授权、虚拟主机、交换机和队列的创建等。
第二章消息发布与订阅2.1 消息生产者通过使用RabbitMQ的API,开发者可以编写生产者代码将消息发布到RabbitMQ的交换机上。
这里需要注意消息的序列化和指定交换机名称。
2.2 消息消费者RabbitMQ的消费者通过订阅交换机的队列来接收消息,可以使用RabbitMQ的API编写消费者代码,并实现消息的处理逻辑。
2.3 消息确认机制RabbitMQ提供了消息的确认机制,确保消息在传输过程中的可靠性。
开发者可以选择隐式确认或显式确认来保证消息的消费状态。
第三章消息路由与过滤3.1 路由模式RabbitMQ支持多种路由模式,如直接路由、主题路由和广播路由。
开发者可以根据实际需求选择最适合的路由模式。
3.2 消息过滤通过使用RabbitMQ的消息过滤功能,可以根据消息的属性进行过滤,只有满足条件的消息才会被消费者接收。
第四章高级特性与扩展4.1 持久化使用RabbitMQ的持久化机制,可以确保消息在服务器重启后依然存在,防止消息丢失。
4.2 集群与高可用通过搭建RabbitMQ集群,可以提高系统的可用性和扩展性。
在集群中,消息将自动在节点之间进行复制。
rabbitmq五种模式详解(含实现代码)⼀、五种模式详解1.简单模式(Queue模式)当⽣产端发送消息到交换机,交换机根据消息属性发送到队列,消费者监听绑定队列实现消息的接收和消费逻辑编写.简单模式下,强调的⼀个队列queue只被⼀个消费者监听消费.1.1 结构⽣产者:⽣成消息,发送到交换机交换机:根据消息属性,将消息发送给队列消费者:监听这个队列,发现消息后,获取消息执⾏消费逻辑1.2应⽤场景常见的应⽤场景就是⼀发,⼀接的结构例如:⼿机短信邮件单发2.争抢模式(Work模式)强调的也是后端队列与消费者绑定的结构2.1结构⽣产者:发送消息到交换机交换机:根据消息属性将消息发送给队列消费者:多个消费者,同时绑定监听⼀个队列,之间形成了争抢消息的效果2.2应⽤场景抢红包资源分配系统3.路由模式(Route模式 Direct定向)从路由模式开始,关⼼的就是消息如何到达的队列,路由模式需要使⽤的交换机类型就是路由交换机(direct)3.1 结构⽣产端:发送消息,在消息中处理消息内容,携带⼀个routingkey交换机:接收消息,根据消息的routingkey去计算匹配后端队列的routingkey队列:存储交换机发送的消息消费端:简单模式⼯作争抢3.2应⽤场景短信聊天⼯具邮箱。
⼿机号/邮箱地址,都可以是路由key4.发布订阅模式(Pulish/Subscribe模式 Fanout⼴播)不计算路由的⼀种特殊交换机4.1结构4.2应⽤场景消息推送⼴告5.主题模式(Topics模式 Tpoic通配符)路由key值是⼀种多级路径。
中国.四川.成都.武侯区5.1结构⽣产端:携带路由key,发送消息到交换机队列:绑定交换机和路由不⼀样,不是⼀个具体的路由key,⽽可以使⽤*和#代替⼀个范围| * | 字符串,只能表⽰⼀级 || --- | --- || # | 多级字符串 |交换机:根据匹配规则,将路由key对应发送到队列消息路由key:北京市.朝阳区.酒仙桥北京市.#: 匹配true上海市.浦东区.*: 没匹配false新疆.乌鲁⽊齐.#5.2 应⽤场景做物流分拣的多级传递.6.完整结构⼆、代码实现1.创建SpringBoot⼯程1.1 ⼯程基本信息1.2 依赖信息1.3 配置⽂件applicasion.properties# 应⽤名称=springboot-demo# Actuator Web 访问端⼝management.server.port=8801management.endpoints.jmx.exposure.include=*management.endpoints.web.exposure.include=*management.endpoint.health.show-details=always# 应⽤服务 WEB 访问端⼝server.port=8801######################### RabbitMQ配置 ######################### RabbitMQ主机spring.rabbitmq.host=127.0.0.1# RabbitMQ虚拟主机spring.rabbitmq.virtual-host=demo# RabbitMQ服务端⼝spring.rabbitmq.port=5672# RabbitMQ服务⽤户名ername=admin# RabbitMQ服务密码spring.rabbitmq.password=admin# RabbitMQ服务发布确认属性配置## NONE值是禁⽤发布确认模式,是默认值## CORRELATED值是发布消息成功到交换器后会触发回调⽅法## SIMPLE值经测试有两种效果,其⼀效果和CORRELATED值⼀样会触发回调⽅法,其⼆在发布消息成功后使⽤rabbitTemplate调⽤waitForConfirms或waitForConfirmsOrDie⽅法等待broker节点返回发送结果,根据返回结果来判定下⼀步的逻辑,要注意的点是wa spring.rabbitmq.publisher-confirm-type=simple# RabbitMQ服务开启消息发送确认spring.rabbitmq.publisher-returns=true######################### simple模式配置 ######################### RabbitMQ服务消息接收确认模式## NONE:不确认## AUTO:⾃动确认## MANUAL:⼿动确认spring.rabbitmq.listener.simple.acknowledge-mode=manual# 指定最⼩的消费者数量spring.rabbitmq.listener.simple.concurrency=1# 指定最⼤的消费者数量spring.rabbitmq.listener.simple.max-concurrency=1# 开启⽀持重试spring.rabbitmq.listener.simple.retry.enabled=true2.简单模式2.1 创建SimpleQueueConfig 简单队列配置类package com.gmtgo.demo.simple;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** @author ⼤帅*/@Configurationpublic class SimpleQueueConfig {/*** 定义简单队列名.*/private final String simpleQueue = "queue_simple";@Beanpublic Queue simpleQueue() {return new Queue(simpleQueue);}}2.2 编写⽣产者package com.gmtgo.demo.simple;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import ponent;/*** @author ⼤帅*/@Slf4j@Componentpublic class SimpleProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage() {for (int i = 0; i < 5; i++) {String message = "简单消息" + i;("我是⽣产信息:{}", message);rabbitTemplate.convertAndSend( "queue_simple", message);}}}2.3 编写消费者package com.gmtgo.demo.simple;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import ponent;import java.io.IOException;/*** @author ⼤帅*/@Slf4j@Componentpublic class SimpleConsumers {@RabbitListener(queues = "queue_simple")public void readMessage(Message message, Channel channel) throws IOException {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);("我是消费信息:{}", new String(message.getBody()));}}2.4 编写访问类package com.gmtgo.demo.simple;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/*** @author ⼤帅*/@RestController@RequestMapping(value = "/rabbitMq")public class SimpleRabbitMqController {@Autowiredprivate SimpleProducer simpleProducer;@RequestMapping(value = "/simpleQueueTest")public String simpleQueueTest() {simpleProducer.sendMessage();return "success";}}2.5 测试启动项⽬访问 simpleQueueTest访问地址:结果:3.Work队列3.1 编写⼯作配置package com.gmtgo.demo.work;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** @author ⼤帅*/@Configurationpublic class WorkQueueConfig {/*** 队列名.*/private final String work = "work_queue";@Beanpublic Queue workQueue() {return new Queue(work);}}3.2 编写⽣产者package com.gmtgo.demo.work;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import ponent;/*** @author ⼤帅*/@Slf4j@Componentpublic class WorkProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage() {for (int i = 0; i < 10; i++) {String message = "⼯作消息" + i;("我是⽣产信息:{}", message);rabbitTemplate.convertAndSend("work_queue", message);}}}3.3 编写消费者1package com.gmtgo.demo.work;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import ponent;import java.io.IOException;/*** @author ⼤帅*/@Slf4j@Componentpublic class WorkConsumers1 {@RabbitListener(queues = "work_queue")public void readMessage(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);("我是消费信息1:{}", new String(message.getBody()));}}3.4 编写消费者2package com.gmtgo.demo.work;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import ponent;import java.io.IOException;/*** @author ⼤帅*/@Slf4j@Componentpublic class WorkConsumers2 {@RabbitListener(queues = "work_queue")public void readMessage(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);("我是消费信息2:{}", new String(message.getBody()));}}3.5 编写测试⽅法package com.gmtgo.demo.work;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/*** @author ⼤帅*/@RestController@RequestMapping(value = "rabbitMq")public class WorkRabbitMqController {@Autowiredprivate WorkProducer workProducer;@RequestMapping(value = "workQueueTest")public String workQueueTest() {workProducer.sendMessage();return "success";}}3.6 测试启动项⽬访问 workQueueTest访问地址结果:控制台打印,发现10条消息 偶数条消费者1获取,奇数条消费者2获取,并且平均分配。
RabbitMQ从零到集群⾼可⽤.NetCore(.NET5)-RabbitMQ简介和六种。
系列⽂章:⼀、RabbitMQ简介是⼀个开源的消息代理和队列服务器,⽤来通过普通协议在完全不同的应⽤之间共享数据,RabbitMQ是使⽤Erlang(⾼并发语⾔)语⾔来编写的,并且RabbitMQ是基于AMQP协议的。
1.1 AMQP协议Advanced Message Queuing Protocol(⾼级消息队列协议)1.2 AMQP专业术语:(多路复⽤->在同⼀个线程中开启多个通道进⾏操作)Server:⼜称broker,接受客户端的链接,实现AMQP实体服务Connection:连接,应⽤程序与broker的⽹络连接Channel:⽹络信道,⼏乎所有的操作都在channel中进⾏,Channel是进⾏消息读写的通道。
客户端可以建⽴多个channel,每个channel代表⼀个会话任务。
Message:消息,服务器与应⽤程序之间传送的数据,由Properties和Body组成.Properties可以对消息进⾏修饰,必须消息的优先级、延迟等⾼级特性;Body则是消息体内容。
virtualhost: 虚拟地址,⽤于进⾏逻辑隔离,最上层的消息路由。
⼀个virtual host⾥⾯可以有若⼲个Exchange和Queue,同⼀个Virtual Host ⾥⾯不能有相同名称的Exchange 或 Queue。
Exchange:交换机,接收消息,根据路由键转单消息到绑定队列Binding: Exchange和Queue之间的虚拟链接,binding中可以包换routing keyRouting key: ⼀个路由规则,虚拟机可⽤它来确定如何路由⼀个特定消息。
(如负载均衡)1.3 RabbitMQ整体架构ClientA(⽣产者)发送消息到Exchange1(交换机),同时带上RouteKey(路由Key),Exchange1找到绑定交换机为它和绑定传⼊的RouteKey的队列,把消息转发到对应的队列,消费者Client1,Client2,Client3只需要指定对应的队列名即可以消费队列数据。
RabbitMQ路由选择
1、绑定(Bindings)
类似下面的代码:
channel.queueBind(queueName, EXCHANGE_NAME, "");
绑定表示转发器与队列之间的关系。
我们也可以简单的认为:队列对该转发器上的消息感兴趣。
绑定可以附带一个额外的参数routingKey。
为了与避免basicPublish方法(发布消息的方法)的参数混淆,我们准备把它称作绑定键(binding key)。
下面展示如何使用绑定键(binding key)来创建一个绑定:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
绑定键的意义依赖于转发器的类型。
对于fanout类型,忽略此参数。
2、直接转发(Direct exchange)
上一篇的日志系统广播所有的消息给所有的消费者。
我们希望可以对其扩展,来允许根据日志的严重性进行过滤日志。
例如:我们可能希望把致命类型的错误写入硬盘,而不把硬盘空间浪费在警告或者消息类型的日志上。
之前我们使用fanout类型的转发器,但是并没有给我们带来更多的灵活性:仅仅可以愚蠢的转发。
我们将会使用direct类型的转发器进行替代。
direct类型的转发器背后的路由转发算法很简单:消息会被推送至绑定键(binding key)和消息发布附带的选择键(routing key)完全匹配的队列。
图解:
上图,我们可以看到direct类型的转发器与两个队列绑定。
第一个队列与绑定键orange绑定,第二个队列与转发器间有两个绑定,一个与绑定键black绑定,另一个与green绑定键绑定。
这样的话,当一个消息附带一个选择键(routing key)orange发布至转发器将会被导向到队列Q1。
消息附带一个选择键(routing key)black或者green将会被导向到Q2.所有的其他的消息将会被丢弃。
3、多重绑定(multiple bindings)
使用一个绑定键(binding key)绑定多个队列是完全合法的。
如上图,一个附带选择键(routing key)的消息将会被转发到Q1和Q2。
4、发送日志(Emittinglogs)
我们准备将这种模式用于我们的日志系统。
我们将消息发送到direct类型的转发器而不是fanout类型。
我们将把日志的严重性作为选择键(routing key)。
这样的话,接收程序可以根据严重性来选择接收。
我们首先关注发送日志的代码:
像以前一样,我们需要先创建一个转发器:
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
然后我们准备发送一条消息:
channel.basicPublish(EXCHANGE_NAME,severity, null, message.getBytes());
为了简化代码,我们假定‘severity’是‘info’,‘warning’,‘error’中的一个。
5、订阅
接收消息的代码和前面的博客的中类似,只有一点不同:我们给我们所感兴趣的严重性类型的日志创建一个绑定。
StringqueueName = channel.queueDeclare().getQueue();
for(Stringseverity : argv)
{
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
6、完整的实例
发送端:EmitLogDirect.java
随机发送6条随机类型(routing key)的日志给转发器~~ 接收端:ReceiveLogsDirect.java
接收端随机设置一个日志严重级别(binding_key)。
我开启了3个接收端程序,两个准备接收error类型日志,一个接收info类型日志,然后运行发送端程序运行结果:
[x] Sent 'error_log :d142b096-46c0-4380-a1d2-d8b2ac136a9c'
[x] Sent 'error_log :55ee1fc4-c87c-4e5e-81ba-49433890b9ce'
[x] Sent 'error_log :d01877d6-87c7-4e0a-a109-697d122bc4c9'
[x] Sent 'error_log :b42471b1-875c-43f1-b1ea-0dd5b49863f3'
[x] Sent 'info_log :a6c1bc87-efb0-43eb-8314-8a74c345ed05'
[x] Sent 'info_log :b6a84b6a-353e-4e88-8c23-c791d93b44be'
------------------------------------------------------------------------------------
[*] Waiting for error logs. To exit press CTRL+C
[x] Received 'error_log :d142b096-46c0-4380-a1d2-d8b2ac136a9c'
[x] Received 'error_log :55ee1fc4-c87c-4e5e-81ba-49433890b9ce'
[x] Received 'error_log :d01877d6-87c7-4e0a-a109-697d122bc4c9'
[x] Received 'error_log :b42471b1-875c-43f1-b1ea-0dd5b49863f3'
------------------------------------------------------------------------------------
[*] Waiting for error logs. To exit press CTRL+C
[x] Received 'error_log :d142b096-46c0-4380-a1d2-d8b2ac136a9c'
[x] Received 'error_log :55ee1fc4-c87c-4e5e-81ba-49433890b9ce'
[x] Received 'error_log :d01877d6-87c7-4e0a-a109-697d122bc4c9'
[x] Received 'error_log :b42471b1-875c-43f1-b1ea-0dd5b49863f3'
------------------------------------------------------------------------------------
[*] Waiting for info logs. To exit press CTRL+C
[x] Received 'info_log :a6c1bc87-efb0-43eb-8314-8a74c345ed05'
[x] Received 'info_log :b6a84b6a-353e-4e88-8c23-c791d93b44be'
可以看到我们实现了博文开头所描述的特性,接收者可以自定义自己感兴趣类型的日志。
其实文章这么长就在说:发送消息时可以设置routing_key,接收队列与转发器间可以设置binding_key,接收者接收与binding_key与routing_key相同的消息。
本文作者:鸿洋。