rabbitmq+学习手册
- 格式:pdf
- 大小:438.37 KB
- 文档页数:15
Rabbitmq基本API使⽤⼀、⽣产者1. 创建ConnectionFactory⼯⼚(地址、⽤户名、密码、vhost)2. 创建Connection3. 创建信道(Channel)4. 创建 exchange(指定名称、类型-DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");、是否持久化)5. 发送消息(指定:exchange、发送的routingKey ,发送到的消息)基础的⽣产者:public class TestProducer {public final static String EXCHANGE_NAME = "direct_logs";public static void main(String[] args)throws IOException, TimeoutException {/* 创建连接,连接到RabbitMQ*/ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.112.131");connectionFactory.setVirtualHost("my_vhost");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = connectionFactory.newConnection();/*创建信道*/Channel channel = connection.createChannel();/*创建交换器*/channel.exchangeDeclare(EXCHANGE_NAME,"direct");//channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);/*⽇志消息级别,作为路由键使⽤*/String[] routekeys = {"king","queue","prince"};for(int i=0;i<3;i++){String routekey = routekeys[i%3];String msg = "Hellol,RabbitMq"+(i+1);/*发布消息,需要参数:交换器,路由键,其中以⽇志消息级别为路由键*/channel.basicPublish(EXCHANGE_NAME,routekey,null,msg.getBytes());System.out.println("Sent "+routekey+":"+msg);}channel.close();connection.close();}}View Code⼆、消费者1. 创建ConnectionFactory⼯⼚(地址、⽤户名、密码、vhost)2. 创建Connection3. 创建信道(Channel)4. 声明⼀个 exchange(指定名称、类型、是否持久化)5. 创建⼀个队列(指定:名称,是否持久化,是否独占,是否⾃动删除,其他参数)6. 队列、exchange通过routeKey进⾏绑定7. 消费者接收消息(队列名称,是否⾃动ACK)基本的消费者:public class TestConsumer {public static void main(String[] argv)throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.112.131");factory.setVirtualHost("my_vhost");factory.setUsername("admin");factory.setPassword("admin");// 打开连接和创建频道,与发送端⼀样Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.exchangeDeclare(TestProducer.EXCHANGE_NAME,"direct");/*声明⼀个队列*/String queueName = "focuserror";channel.queueDeclare(queueName,false,false,false,null);/*绑定,将队列和交换器通过路由键进⾏绑定*/String routekey = "king";/*表⽰只关注error级别的⽇志消息*/channel.queueBind(queueName,TestProducer.EXCHANGE_NAME,routekey);System.out.println("waiting for message........");/*声明了⼀个消费者*/final Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("Received["+envelope.getRoutingKey()+"]"+message);}};/*消费者正式开始在指定队列上消费消息*/channel.basicConsume(queueName,true,consumer);}}View Code三、消息持久化1. exchange 需要持久化2. 发送消息设置参数为 MessageProperties.PERSISTENT_TEXT_PLAIN3. 队列需要设置参数为持久化1、//TODO 创建持久化交换器 durable=truechannel.exchangeDeclare(EXCHANGE_NAME,"direct",true);2、//TODO 发布持久化的消息(delivery-mode=2)channel.basicPublish(EXCHANGE_NAME,routekey,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());3、//TODO 声明⼀个持久化队列(durable=true)// autoDelete=true 消费者停⽌了,则队列会⾃动删除//exclusive=true独占队列,只能有⼀个消费者消费String queueName = "msgdurable";channel.queueDeclare(queueName,true,false,false,null);四、如何⽀持事务(防⽌投递消息的时候消息丢失-效率特别低,不建议使⽤,可以使⽤⽣产者ACK机制)1. 启动事务2. 成功提交3. 失败则回滚//TODO//加⼊事务channel.txSelect();try {for(int i=0;i<3;i++){String routekey = routekeys[i%3];// 发送的消息String message = "Hello World_"+(i+1)+("_"+System.currentTimeMillis());channel.basicPublish(EXCHANGE_NAME, routekey, true,null, message.getBytes());System.out.println("----------------------------------");System.out.println(" Sent Message: [" + routekey +"]:'"+ message + "'");Thread.sleep(200);}//TODO//事务提交channel.txCommit();} catch (IOException e) {e.printStackTrace();//TODO//事务回滚channel.txRollback();} catch (InterruptedException e) {e.printStackTrace();}View Code五、消费消息⼿动ACK,如果异常则使⽤拒绝的⽅式,然后异常消息推送到-死信队列批量ack的时候如果其中有⼀个消息出现异常,则会导致消息丢失(⽇志处理的时候可以使⽤批量)1 /*消费者正式开始在指定队列上消费消息,第⼆个参数false为⼿动应答*/channel.basicConsume(queueName,false,consumer);2 收到消息以后,⼿动应答数据接收成功channel.basicAck(envelope.getDeliveryTag(),false);3 收到消息,如果处理失败则拒绝消息:DeliveryTag是消息在队列中的标识channel.basicReject(envelope.getDeliveryTag(),false);4 决绝的参数说明//TODO Reject⽅式拒绝(这⾥第2个参数决定是否重新投递),不要重复投递,因为消息重复投递后处理可能依然异常//channel.basicReject(envelope.getDeliveryTag(),false);//TODO Nack⽅式的拒绝(第2个参数决定是否批量,第3个参数是否重新投递)channel.basicNack(envelope.getDeliveryTag(), false, true);View Code六、创建队列的参数解析:场景,延迟队列,保存带有时效性的订单,⼀旦订单过期,则信息会转移到死信队列//TODO /*⾃动过期队列--参数需要Map传递*/String queueName = "setQueue";Map<String, Object> arguments = new HashMap<String, Object>();arguments.put("x-expires",10*1000);//消息在队列中保存10秒后被删除//TODO 队列的各种参数/*加⼊队列的各种参数*/// autoDelete=true 消费者停⽌了,则队列会⾃动删除//exclusive=true独占队列,只能有⼀个消费者消费channel.queueDeclare(queueName,true,true, false,arguments);七、发送消息以后带有应答的队列1. 声明⼀个回应队列2. 声明⼀个回应消息的消费者3. 声明⼀个属性对象(指定队列,会唯⼀的id)4. ⽣产者发送消息给消费者(带着回应队列)5. 消费者接收到消息以后根据对应的信息,给予回应⽣产者端:1、//TODO 响应QueueName ,消费者将会把要返回的信息发送到该QueueString responseQueue = channel.queueDeclare().getQueue();//TODO 消息的唯⼀idString msgId = UUID.randomUUID().toString();2、/*声明了⼀个消费者*/final Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("Received["+envelope.getRoutingKey()+"]"+message);}};//TODO 消费者应答队列上的消息channel.basicConsume(responseQueue,true,consumer);3、//TODO 设置消息中的应答属性AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().replyTo(responseQueue).messageId(msgId).build();4、String msg = "Hello,RabbitMq";//TODO 发送消息时,把响应相关属性设置进去channel.basicPublish(EXCHANGE_NAME,"error",properties,msg.getBytes());View Code消费者端:String message = new String(body, "UTF-8");System.out.println("Received["+envelope.getRoutingKey()+"]"+message);//TODO 从消息中拿到相关属性(确定要应答的消息ID,)AMQP.BasicProperties respProp= new AMQP.BasicProperties.Builder().replyTo(properties.getReplyTo()).correlationId(properties.getMessageId()).build();//TODO 消息消费时,同时需要⽣作为⽣产者⽣产消息(以OK为标识)channel.basicPublish("", respProp.getReplyTo() ,respProp ,("OK,"+message).getBytes("UTF-8"));⼋、死信队列 - 下列消息会放到死信队列1. 消息被否定确认,使⽤ channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
RabbitMQ-常⽤消息队列之:Worker模式(⼀对多模式)Producer:消息的⽣产者(发送消息的程序)。
Queue:消息队列,理解为⼀个容器,⽣产者向它发送消息,它把消息存储,等待消费者消费。
Consumer:消息的消费者(接收消息的程序)。
此处我们假设 Consumer1、Consumer2、Consumer3 分别为完成任务速度不⼀样快的消费者,这会引出此模式的⼀个重点问题。
如何理解: ⼯作模式由图可以看出,就是在简单队列模式的基础上,增加了多个消费者,也就是让多个消费者绑定同⼀个队列,共同去消费,这样能解决简单队列模式中,如果⽣产速速远⼤于消费速度,⽽导致的消息堆积现象。
因为消息被消费后就会消失,所以不必担⼼任务会重复执⾏。
代码实现: 注:⼯作队列模式有两种1. 轮询模式:每个消费者均分消息2. 公平分发模式(能者多劳):按能⼒分发,处理速度快的分发的多,处理速度慢的分发的少Worker模式 Worker模式其实就是⼀对多模式,我们定义两个消费者来看看效果: 默认情况下,RabbitMQ会顺序的将message发给下⼀个消费者。
每个消费者会得到平均数量的message。
这种⽅式称之为round-robin(轮询)。
但是很多情况下并不希望消息平均分配,⽽是要消费快的多消费,消费少的少消费.还有很多情况下⼀旦其中⼀个宕机,那么另外接收者的⽆法接收原本这个接收者所要接收的数据。
我们修改其中⼀个消费者代码,让其等待5秒。
在等待中停⽌运⾏看看效果。
consumer.Received += (ch, ea) =>{string message = Encoding.Default.GetString(ea.Body.ToArray());Console.WriteLine($"接受到消息:{message}");Thread.Sleep(5000);}; 当消费者宕机后消费者1并没有接受宕机后的数据。
rabbitmq的使用方法RabbitMQ是一个开源的消息代理软件,用于实现异步消息传递。
以下是使用RabbitMQ的一些基本方法:1. 安装和配置:首先,你需要从RabbitMQ的官网下载并安装RabbitMQ 服务器。
安装完成后,你可以通过浏览器访问RabbitMQ的管理界面,进行基本的配置。
2. 创建队列:在RabbitMQ中,消息被存储在队列中。
你可以使用RabbitMQ的管理界面或者通过编程的方式创建队列。
例如,使用Python 的pika库,你可以这样创建一个队列:```pythonimport pikaconnection = (('localhost'))channel = ()_declare(queue='hello')()```3. 发送消息:一旦你创建了队列,你就可以开始发送消息到这个队列。
同样使用pika库,你可以这样发送消息:```pythonimport pikaconnection = (('localhost'))channel = ()_publish(exchange='', routing_key='hello', body='Hello World!') ()```4. 接收消息:要接收消息,你需要创建一个消费者来从队列中获取消息。
消费者可以是任何能够处理RabbitMQ消息的应用程序。
例如,你可以创建一个Python消费者来接收消息:```pythonimport pikaconnection = (('localhost'))channel = ()_declare(queue='hello')def callback(ch, method, properties, body):print(f" [x] Received {body}")_consume(queue='hello', on_message_callback=callback,auto_ack=True)print(' [] Waiting for messages. To exit press CTRL+C')_consuming()```5. 确认消息处理:在RabbitMQ中,你可以选择自动确认(auto_ack)或手动确认(manual_ack)消息处理。
RabbitMQ主要学习步骤1.了解 RabbitMQ的基本概念:学习 RabbitMQ的基本语法、消息类型、监听事件等。
2.学习 RabbitMQ的实现原理:了解 RabbitMQ的体系结构,包括队列、消息循环、线程池等。
3.学习 RabbitMQ的使用:通过阅读相关书籍、教程和实践项目,了解RabbitMQ的具体应用。
4.练习编写实际的项目:通过编写实际的项目,掌握 RabbitMQ的应用场景和实现技巧。
5.了解 RabbitMQ与其他 Java平台集成:通过阅读相关书籍、教程和实践项目,了解 RabbitMQ与其他 Java平台集成的方式和方法。
RabbitMQ详细学习步骤RabbitMQ是一款分布式消息队列,用于实时传输文件和数据。
下面是一个详细的学习步骤,以帮助您学会 abbitMQ:1、选择一款合适的分布式系统开发工具:选择一款适合进行分布式数据处理和传输的工具,如 Git、 ExtJS等。
2、安装 abbitMQ:通过安装 abbitMQ来初始化系统,并确保它们支持各种分布式系统。
3、配置 abbitMQ以支持各种分布式系统,如 JVM、 ElasticSearch等。
4、创建一个或多个进程来运行 abbitMQ,并配置其相关参数以支持分布式数据处理和传输。
5、编写代码:使用 abbitMQ编写各种类型的数据传输代码,并练习根据不同的用途灵活使用 abbitMQ。
6、测试和运行项目:通过测试和运行项目来验证 abbitMQ的功能和性能,并逐渐提高您的技能水平。
7、了解 abbitMQ与其他 Java平台集成:通过了解 abbitMQ与其他 Java 平台集成,如 ElasticSearch、 ExtJS等,以提高您的开发效率。
8、建议先学习一下 Java语言和分布式系统知识,这对学习 abbitMQ非常有帮助。
rabbitmq basicconsume方法的arguments参数RabbitMQ是一款广泛使用的消息队列系统,它提供了许多功能强大的特性,如消息传递、可靠性保证、消息持久化等。
在RabbitMQ中,使用BasicConsume方法可以开始消费队列中的消息。
这个方法接受一些重要的参数,以下是关于这些参数的详细解释和用法指南。
一、参数详解1. consumer_tag:一个唯一标识符,用于标记消费者实例。
每当消费者从队列中消费完一条消息后,它的标识符都需要被重置为空字符串。
2. queue:要消费的队列名称。
这是必须的参数,因为RabbitMQ只会向指定的队列发送消息。
3. autoack:一个布尔值,用于决定是否在收到消息后自动确认并回传给服务器。
如果设置为true,那么消费者在处理完消息后需要调用BasicDeliver方法的第二个参数中的ack方法进行确认。
4. exclusive:一个布尔值,用于决定是否创建一个排他性(Exclusive)的消费者。
如果设置为true,那么这个消费者将只能消费这个队列的消息,而不会影响到其他消费者。
5. no_local:一个布尔值,用于决定是否阻止从本地队列发送消息到这个消费者。
通常这个参数是关闭的(false)。
6. wait:一个布尔值,决定是否等待消费者订阅的队列是否有新消息产生。
当设置为true时,程序将一直等待直到队列中有新消息可用为止。
7. auto_detect:这是一个指示器,当设置为true时,RabbitMQ将自动检测队列的变化并重新订阅。
二、使用示例以下是一个使用BasicConsume方法的简单示例:```pythonimport pikaconnection =pika.BlockingConnection(pika.ConnectionParameters('localhost' ))channel = connection.channel()channel.queue_declare(queue='my_queue')consumer_tag = channel.basic_consume(queue='my_queue', auto_ack=True)while True:# 获取并处理消息...connection.process_data_events()# 当处理完一条消息后,需要调用ack方法确认消息已被处理if consumer_tag:channel.basic_ack(consumer_tag,delivery_tag='<delivery-tag>')else:breakconnection.close()```在这个示例中,我们首先创建了一个到本地RabbitMQ服务器的连接,然后声明了一个名为'my_queue'的队列。
rabbitmq使用手册RabbitMQ是一种开源的消息队列中间件,采用AMQP协议,被广泛应用于构建可靠、高效的分布式系统。
本手册将详细介绍RabbitMQ 的安装、配置、使用和常见问题解决方案,帮助读者快速上手使用RabbitMQ。
第一章安装与配置1.1 环境准备在开始安装RabbitMQ之前,需要确保系统满足以下要求:操作系统(例如Linux、Windows)、Erlang运行时环境以及RabbitMQ软件包。
1.2 安装RabbitMQ按照文档提供的方式,在所选的操作系统上安装RabbitMQ。
安装过程中需注意版本兼容性和安全配置。
1.3 配置RabbitMQ在安装完成后,需要对RabbitMQ进行适当的配置。
主要包括网络配置、认证与授权、虚拟主机、交换机和队列的创建等。
第二章消息发布与订阅2.1 消息生产者通过使用RabbitMQ的API,开发者可以编写生产者代码将消息发布到RabbitMQ的交换机上。
这里需要注意消息的序列化和指定交换机名称。
2.2 消息消费者RabbitMQ的消费者通过订阅交换机的队列来接收消息,可以使用RabbitMQ的API编写消费者代码,并实现消息的处理逻辑。
2.3 消息确认机制RabbitMQ提供了消息的确认机制,确保消息在传输过程中的可靠性。
开发者可以选择隐式确认或显式确认来保证消息的消费状态。
第三章消息路由与过滤3.1 路由模式RabbitMQ支持多种路由模式,如直接路由、主题路由和广播路由。
开发者可以根据实际需求选择最适合的路由模式。
3.2 消息过滤通过使用RabbitMQ的消息过滤功能,可以根据消息的属性进行过滤,只有满足条件的消息才会被消费者接收。
第四章高级特性与扩展4.1 持久化使用RabbitMQ的持久化机制,可以确保消息在服务器重启后依然存在,防止消息丢失。
4.2 集群与高可用通过搭建RabbitMQ集群,可以提高系统的可用性和扩展性。
在集群中,消息将自动在节点之间进行复制。
C#教程之C#教程之RabbitMQ基础⼊门篇作者:warren来源:/声明:原创博客请在转载时保留原⽂链接或者在⽂章开头加上本⼈博客地址,如发现错误,欢迎批评指正。
凡是转载于本⼈的⽂章,不能设置打赏功能,如有特殊需求请与本⼈联系!下载安装ErlangRabbitMQ启动RabbitMQ管理平台插件DOS下进⼊到安装⽬录\sbin,执⾏以下命令rabbitmq-plugins enable rabbitmq_management当出现以下结果时,重启RabbitMQ服务set 3 plugins.Offline change; changes will take effect at broker restart.注意:以下为C#代码,请引⽤NuGet包:RabbitMQ.Client回到顶部参考⽂章RabbitMQ快速⼊门回到顶部名词解析P(Publisher):⽣产者C(Consumer):消费者Channel:信道Queue:队列Exchange:信息交换机回到顶部简单演⽰信息发送端static void Send(){//1. 实例化连接⼯⼚var factory = new ConnectionFactory() { HostName = "localhost" };//2. 建⽴连接using (var connection = factory.CreateConnection()){//3. 创建信道using (var channel = connection.CreateModel()){//4. 声明队列channel.QueueDeclare(queue: "rabbitmq",durable: false,exclusive: false,autoDelete: false,arguments: null);//5. 构建字节数据包var message = "Hello RabbitMQ!";var body = Encoding.UTF8.GetBytes(message);//6. 发送数据包channel.BasicPublish(exchange: "",routingKey: "rabbitmq",basicProperties: null,body: body);Console.WriteLine(" [x] Sent {0}", message);}}}信息接收端static void Receive(){//1. 实例化连接⼯⼚var factory = new ConnectionFactory() { HostName = "localhost" };//2. 建⽴连接using (var connection = factory.CreateConnection()){//3. 创建信道using (var channel = connection.CreateModel()){//4. 声明队列channel.QueueDeclare(queue: "rabbitmq",durable: false,exclusive: false,autoDelete: false,arguments: null);//5. 构造消费者实例var consumer = new EventingBasicConsumer(channel);//6. 绑定消息接收后的事件委托consumer.Received += (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body);Console.WriteLine(" [x] Received {0}", message);};//7. 启动消费者channel.BasicConsume(queue: "rabbitmq",autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}}}回到顶部轮询调度P⽣产的多个任务进⼊到队列中,多个C间可以并⾏处理任务。
rabbitmq的用法
RabbitMQ是一种开源的消息队列中间件,用于在分布式系统中传递和存储消息。
以下是两种常见的rabbitmq用法:
1. 用作消息传递中间件:
RabbitMQ通过将消息发送到队列中并使用异步方式将其传递给接收者,实现了
松耦合的系统间通信。
发送者将消息发布到交换机,然后交换机将消息路由到一个或多个队列,接收者可以订阅一个或多个队列以接收消息。
这种模式非常适合异步处理、解耦和系统扩展。
2. 用作任务队列:
RabbitMQ还可以用作任务队列,将任务从一个应用程序分发给多个工作进程。
发送者将任务作为消息发布到队列,多个工作进程消费队列中的消息并执行相应的任务。
这种模式通常用于在高并发的情况下平衡任务负载和提高系统的可靠性。
无论是作为消息传递中间件还是任务队列,RabbitMQ的使用步骤大致相同:首先需要安装和配置RabbitMQ服务器,然后使用相关的客户端库在应用程序中进
行消息的发送和接收。
在发送消息时,可以指定消息的目标队列或交换机,也可以设置其他的消息属性。
在接收消息时,可以按照特定的策略进行消息的消费和处理。
需要注意的是,RabbitMQ还提供了许多高级特性,比如消息持久化、消息确认机制、消息优先级等,根据具体的需求和场景,可以选择合适的配置和使用方式。
RabbitMQ-管理界⾯介绍OverviewOverview 概览connections:⽆论⽣产者还是消费者,都需要与 RabbitMQ 建⽴连接后才可以完成消息的⽣产和消费,在这⾥可以查看连接情况channels:通道,建⽴连接后,会形成通道,消息的投递获取依赖的通道Exchanges:交换机,⽤来实现消息的路由Queues:队列,就是消息队列,消息存放在队列中,等待消费,消费后会被移除队列AdminAdmin →⽤户和虚拟主机的管理⾯板添加⽤户上⾯的 Tags 选项,其实是指定⽤户的⾓⾊,可选的有以下⼏个:超级管理员(administrator)可登陆管理控制台,可查看所有的信息,并且可以对⽤户,策略(policy)进⾏操作监控者(monitoring)可登陆管理控制台,同时可以查看 RabbitMQ 节点的相关信息(进程数,内存使⽤情况,磁盘使⽤情况等)策略制定者(policymaker)可登陆管理控制台, 同时可以对 policy 进⾏管理。
但⽆法查看节点的相关信息普通管理者(management)仅可登陆管理控制台,⽆法看到节点信息,也⽆法对策略进⾏管理其他:⽆法登陆管理控制台,通常就是普通的⽣产者和消费者创建虚拟主机为了让各个⽤户可以互不⼲扰的⼯作,RabbitMQ 添加了虚拟主机(Virtual Hosts)的概念其实就是⼀个独⽴的访问路径,不同⽤户使⽤不同路径,各⾃有⾃⼰的队列、交换机,互相不会影响对⽅如下图中的步骤创建即可,填写好相关的 Virtual Hosts 信息添加即可:绑定虚拟主机和⽤户创建好虚拟主机,我们还要给⽤户添加访问权限点击添加好的虚拟主机:会进⼊到虚拟机的设置界⾯然后就可以添加了,可以选择不同的⽤户设置不同的权限选择好之后点击 Set permission 即可设置好:创建v-it6666虚拟主机点击添加之后可以看到添加成功了如下图:创建it6666⽤户可以看到添加成功,但是现在不能访问任何的虚拟主机如下图:分配权限再次点击 Admin 即可返回列表页⾯如下图:可以看到已经可以了。
rabbitmq的memory details详细解读概述说明1. 引言1.1 概述在当今高并发、大规模数据处理的背景下,消息队列作为一种重要的通信机制,被广泛应用于各种系统中。
RabbitMQ作为最流行的开源消息中间件之一,具有可靠、高性能和可扩展性等特点,在分布式系统架构中起到了至关重要的作用。
本文将详细讨论RabbitMQ内存使用的细节,并探讨如何对其进行调优,以提高其性能和稳定性。
同时也会介绍在集群环境下如何管理和同步内存,以实现高可用性和平衡内存容量。
1.2 文章结构本文共分为五个部分:引言、RabbitMQ内存细节详解、RabbitMQ内存调优、RabbitMQ集群中的内存管理以及结论与展望。
- 第一部分是引言部分,主要对文章的背景和内容进行简要介绍。
- 第二部分将详细介绍RabbitMQ的基本概念与原理,并深入研究其内存管理机制。
- 第三部分将重点讨论如何通过设置和管理队列的内存限制来调优RabbitMQ,同时还会探讨消息持久化与内存使用之间的权衡。
- 第四部分将介绍在RabbitMQ集群中的内存管理策略,包括集群模式下的内存分配机制以及集群节点间的内存同步策略,并探讨高可用性和内存容量之间的平衡考虑。
- 最后一部分是总结本文重点要点,并对未来发展进行展望。
1.3 目的本文旨在深入了解RabbitMQ的内存使用细节,帮助读者全面理解RabbitMQ 在消息传递过程中所涉及到的内存管理机制。
同时,通过探讨调优方法和集群环境下的内存管理策略,读者将能够更好地使用和配置RabbitMQ,以提高系统性能并保证其稳定运行。
通过阅读本文,读者将获得以下收益:- 对RabbitMQ的基本原理和概念有深刻理解;- 掌握RabbitMQ内存调优方法和技巧;- 在集群环境中有效管理和同步内存的能力;- 对未来RabbitMQ发展趋势有一定洞察力。
通过这样的研究与实践,可以使读者更好地应用RabbitMQ来构建可靠、高性能和可扩展的系统。
rabbitmq-c接⼝参数说明⽂档关于rabbitmq-c消息队列接⼝参数发布端amqp_new_connection()声明⼀个新的 amqp_connection,简称为 connamqp_tcp_socket_new(conn)创建⼀个 TCP socket:conn 为先前声明的 amqp_conneciton,函数返回值记为 socketamqp_socket_open(socket, hostname, port)打开 TCP socket,获取 socket 值为 status。
其中:socket — 为先前创建的 TCP;hostname — 为rabbitmq server 主机;port — 为其监听端⼝amqp_login(amqp_connection_state_t state, char const vhost, int channel_max, int frame_max, int heartbeat, int sasl_method, …)⽤于登录rabbitmq sever,主要⽤于进⾏权限管理:state — 如前⽂ conn(amqp_connection);vhost — 虚拟主机;chnnel_max — 最⼤连接数;frame_max — 和客户端通信时允许的最⼤帧数, 默认值131072(↑提⾼吞吐,↓降低时延);heartbeat — ⼼跳帧,两次⼼跳间的秒数,heartbeat超时值定义了RabbitMQ及其client库在多久之后认为TCP连接不可到达。
默认为0;sasl_method — SSL认证;amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel)⽤于关联 conn 和 channel:state — conn(amqp_connection);channel — 进⾏RPC的通道;amqp_basic_publish( amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange,amqp_bytes_t routing_key, amqp_boolean_t mandatory, amqp_boolean_t immediate, structamqp_basic_properties_t_ const properties, amqp_bytes_t body)发布消息到代理(通过 routing key 发布消息到 exchange 上):state — conn;channel — 通道标识符;exchange — 代理上被发布消息的 exchange;routing_key — 发布消息时需要使⽤的路由密匙;mandatory — 此标志指⽰服务器如果⽆法将消息路由到队列,该如何反应。
RabbitMQ控制台详解overview→Totals所有队列的阻塞情况Ready:待消费的消息总数Unacked:待应答的消息总数Total:总数 Ready+UnackedPublish:producter pub消息的速率。
Publisher confirm:broker确认pub消息的速率。
Deliver(manual ack):customer⼿动确认的速率。
Deliver( auto ack):customer⾃动确认的速率。
Consumer ack:customer正在确认的速率。
Redelivered:正在传递'redelivered'标志集的消息的速率。
Get (manual ack):响应basic.get⽽要求确认的消息的传输速率。
Get (auto ack):响应于basic.get⽽发送不需要确认的消息的速率。
Return:将basic.return发送给producter的速率。
Disk read:queue从磁盘读取消息的速率。
Disk write:queue从磁盘写⼊消息的速率。
整体⾓⾊的个数Connections:client的tcp连接的总数。
Channels:通道的总数。
Exchange:交换器的总数。
Queues:队列的总数。
Consumers:消费者的总数。
Overview→Nodesbroker的属性Name:broker名称File descriptors:broker打开的⽂件描述符和限制。
Socket descriptors:broker管理的⽹络套接字数量和限制。
当限制被耗尽时,RabbitMQ将停⽌接受新的⽹络连接。
Erlang processes:erlang启动的进程数。
Memory:当前broker占⽤的内存。
Disk space:当前broker占⽤的硬盘。
Uptime:当前broker持续运⾏的时长。
Info:集群的信息。