RocketMq消息队列实施方案-完整版
- 格式:docx
- 大小:2.23 MB
- 文档页数:21
rocketmq 工作流程RocketMQ是阿里巴巴集团开源的一款分布式消息中间件,它具有高可靠、高性能、可扩展的特点,被广泛应用于解决大规模分布式系统中的消息通信问题。
本文将从RocketMQ的工作流程方面进行介绍。
RocketMQ的工作流程可以分为生产者、消息队列、消费者三个主要部分。
生产者负责向消息队列发送消息,消息队列负责存储和分发消息,消费者负责从消息队列中获取消息并进行处理。
生产者将消息发送到RocketMQ的消息队列中。
生产者首先需要与RocketMQ的NameServer进行连接,NameServer负责维护消息队列的元数据信息。
生产者在与NameServer建立连接后,会根据指定的Topic将消息发送到特定的消息队列中。
每个Topic可以分为多个队列,这些队列可以分布在不同的Broker上。
生产者发送消息时,可以选择同步发送或异步发送。
同步发送是指生产者发送消息后会等待消息发送结果,而异步发送则是生产者发送消息后不会等待发送结果。
发送消息时,生产者可以指定消息的Tag、Key 和延迟级别等属性。
消息队列接收到生产者发送的消息后,会将消息存储在磁盘上,并返回一个消息ID给生产者。
消息队列会根据消息的Tag、Key等属性将消息分发到不同的队列中,以便消费者能够根据自己的需求进行消费。
消费者从RocketMQ的消息队列中获取消息进行消费。
消费者首先需要与NameServer进行连接,获取消息队列的元数据信息。
然后,消费者根据指定的Topic和消费者组进行订阅。
消费者可以选择顺序消费或并发消费。
顺序消费是指消费者按照消息的顺序进行消费,而并发消费则是消费者可以同时消费多个消息。
消费者可以根据自己的需求选择适合的消费模式。
消费者从消息队列中获取消息后,会对消息进行处理。
处理的逻辑可以根据业务需求进行编写,比如对消息进行解析、计算、存储等操作。
消费者处理完消息后,可以选择提交消费结果或者回滚消费。
Python:Rocketmq消息队列使⽤rocketmq可以与kafka等⼀起使⽤,⽤于实时消息处理。
安装rocketmq:⽣产消息producer:from rocketmq.client import Producer, Messageimport jsonproducer = Producer('PID-test')producer.set_namesrv_addr('xxx.xxx.xxx.xxx:xxxxx') #rocketmq队列接⼝地址(服务器ip:port)producer.start()msg_body = {"id":"001","name":"test_mq","message":"abcdefg"}ss = json.dumps(msg_body).encode('utf-8')msg = Message('topic_name') #topic名称msg.set_keys('xxxxxx')msg.set_tags('xxxxxx')msg.set_body(ss) #message bodyretmq = producer.send_sync(msg)print(retmq.status, retmq.msg_id, retmq.offset)producer.shutdown()其中:设置ip:port的位置:producer.set_namesrv_addr('xxx.xxx.xxx.xxx:xxxxx')当只有单⼀服务器时,格式是上⾯这个;当有多个服务器地址(集群模式)时,可以使⽤:producer.set_namesrv_addr("xxx.xxx.xxx.xxx:xxxxx,xxx.xxx.xxx.xxx:xxxxx,xxx.xxx.xxx.xxx:xxxxx")不过以下这种⽅式本⼈测试不通过:producer.set_namesrv_addr(["xxx.xxx.xxx.xxx:xxxxx","xxx.xxx.xxx.xxx:xxxxx","xxx.xxx.xxx.xxx:xxxxx"])如果使⽤pandas数据,pandas数据可以直接转换some_df.to_json(orient='records').encode('utf-8'),然后放⼊body中发送。
python实现的消息队列RocketMQ客户端使⽤rocketmq-python 是⼀个基于 rocketmq-client-cpp 封装的 RocketMQ Python 客户端。
⼀、Producer#coding:utf-8import jsonfrom rocketmq.client import Producer, Messageproducer = Producer('PID-001') # 实例化Producer对象,指定group-id(可任意取名)producer.set_namesrv_addr('xxxxxx:xx') # rocketmq队列接⼝地址(服务器ip:port)producer.start() # 开启# 实例化消息对象,需要指定应⽤名:topic_namemsg = Message('your_topic_name') # 实例化消息对象时,传⼊topic名称,⼀个应⽤尽可能⽤⼀个Topic# 指定消息的keysmsg.set_keys('your_keys') # 业务层⾯的唯⼀标识码,⽅便将来定位消息丢失问题。
服务器会为每个消息创建索引(哈希索引),应⽤可以通过topic,key来查询这条消息内容,以及消息被谁消费。
# 指定消息tagsmsg.set_tags('your_tags') # 消息⼦类型⽤tags来标识,tags可以由应⽤⾃由设置。
#指定消息体(内容)msg_body = {'name':'laowang','age':28}body = json.dumps(msg_body).encode('utf-8')msg.set_body(body) # 传⼊消息体(json字节串)# 向队列发送消息ret = producer.send_sync(msg)print(f'status:{ret.status}') # 0表⽰OKprint(f'msg_id:{ret.msg_id}') # 消息id,同消费者获取到的消息idprint(f'offset:{ret.offset}') # 偏移量,默认从0开始,1,2。
消息队列实施方案1、背景异步解耦合、给前端系统提供最高效的反应2、常见消息队列对比2、1 ActiveMqActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规的JMS Provider实现优点:Java语言支持集群模式缺点:性能在消息中间件中处于下游2、2 RabbitmqRabbitmq是基于AMQP使用erlang语言实现的消息队列系统优点:1、完整的消息队列系统,支持多种消息队列模式,包括竞争消费;2、支持集群模式,扩展集群容量和性能比较方便,集成了集群的监控和管理;3、支持消息的持久化;缺点:1、需要学习比较复杂的接口和协议,比较耗费时间;2、性能不是特别理想大概在1wqps左右;3、使用Erlang语言,语言基础;2、3 KafkaKafka 是LinkedIn 开发的一个高性能、分布式的消息发布订阅系统。
优点:1、分布式集群可以透明的扩展,增加新的服务器进集群。
2、高性能。
单机写入TPS约在百万条/秒3、容错。
数据都会复制到几台服务器上。
缺点:1、复杂性。
Kafka需要zookeeper 集群的支持,Topic通常需要人工来创建,部署和维护较一般消息队列成本更高定位于日志传输、存在消息丢失的肯能、消息乱序3、消息发送错误无重试2、4 RocketMQRockerMq 是阿里公司中间件团队参考Kafka思想,用Java语言实现的消息传输系统优点:1、较高性能,单机写入TPS单实例约7万条/秒2、容错,多种集群模式、可以解决容错问题3、消息重试发送4、顺序消息可以严格执行缺点:1、消息重复、消费端需要做去重操作2、5 选用结论从项目业务与团队技术偏向考虑,我们应该需要一种数据安全性比较高,保证每个消息都会被执行;有容错机制、支持集群模式高可用;性能不错,可以在毫秒级处理消息;支持顺序消息的消息中间件,RockerMq 可以满足这些要求。
3、RockerMq简介3、1 RockerMq 产品介绍参考阿里公司提供的《RocketMQ 开发指南》,最新版针对v3.2.43、2 RockerMq集群3、2、1 部署方式Rockermq共有四种部署方式,分别是:1、单个Master一旦Broker 重启或者宕机时,会导致整个服务不可用2、多Master 模式一个集群无Slave,全是Master,例如2 个Master 戒者3 个Master优点:1、配置简单,2、容错,单个Master 宕机或重启维护对应用无影响,在磁盘配置为RAID10 时,即使机器宕机不可恢复情况下,由于RAID10 磁盘非常可靠,在同步刷盘时消息不会丢,异步刷盘丢失少量消息,3、性能最高。
rocketmq 事务消息实现原理RocketMQ提供了基于事务的消息发送功能,可以保证消息的原子性操作,即要么所有消息都能够发送成功,要么全部失败。
下面介绍一下RocketMQ事务消息的实现原理。
1. 事务消息的概念RocketMQ的事务消息指的是一组消息,其中包含了多条待发送的消息。
当这组消息被发送时,如果其中任何一条消息发送失败,则整个事务都会被回滚,即所有消息都会被取消发送。
2. 事务消息的发送过程RocketMQ的事务消息发送过程可以分为以下几个步骤:- 发送消息:首先,生产者将要发送的消息发送到RocketMQ的事务消息队列中。
- 消息确认:RocketMQ事务消息队列接收到消息后,会向生产者发送一个确认消息,表示该消息已经被接收。
- 发送事务消息:生产者收到确认消息后,会将所有待发送的消息打包成一个事务消息,并发送给RocketMQ的事务消息服务。
- 事务消息的处理:RocketMQ的事务消息服务会将事务消息发送给所有的Broker节点,并等待所有Broker节点的响应。
如果所有Broker 节点都能够成功接收到事务消息,则事务消息服务会将该事务消息发送给所有的Broker节点。
- 消息确认:当所有的Broker节点都成功接收到事务消息后,RocketMQ的事务消息服务会向生产者发送一个事务消息确认消息,表示该事务消息已经被成功发送。
- 事务消息的回滚:如果在任何时候出现了问题,例如某个Broker节点无法接收到事务消息,或者某个Broker节点无法成功存储消息,则RocketMQ的事务消息服务会将该事务消息回滚,即所有待发送的消息都会被取消发送。
3. 事务消息的实现原理RocketMQ的事务消息实现原理主要依赖于两阶段协议(2PC),以及RocketMQ的消息存储机制。
- 2PC协议2PC协议是一种用于协调参与者和协调者之间的事务提交和回滚的协议。
在RocketMQ的事务消息实现中,生产者扮演协调者的角色,而Broker节点则扮演参与者的角色。
消息队列的设计⽅案构建消息队列的整体思路设计消息队列的整体思路是先创建⼀个整体的数据流,例如producer发送给broker,broker发送给consumer,consumer回复消费确认,broker删除/备份消息等。
利⽤RPC将数据流串起来。
然后考虑RPC的⾼可⽤性,尽量做到⽆状态,⽅便⽔平扩展。
之后考虑如何承载消息堆积,然后在合适的时机投递消息,⽽处理堆积的最佳⽅式,就是存储,存储的选型需要综合考虑性能/可靠性和开发维护成本等诸多因素。
为了实现⼴播功能,我们必须要维护消费关系,可以利⽤zookeeper/config server等保存消费关系。
当然也并⾮每个消息队列的设计都是有broker的,broker(消息队列的服务端)的作⽤是对消息进⾏转存,以便在更合适的时间进⾏投递。
消息队列的基本功能的实现1.RPC通信协议其实消息队列⽤接地府⼀点的话来讲,就是将⽣产者producer产⽣发送给consumer消费者的⼀次RPC,先⾏发送到了消息队列中做暂存,再由消息队列在⼀个合适的时间发送给消费者进⾏消费,这样,⼀次RPC便被转为了两次的RPC,所以我们必须使⽤/⾃⼰实现⼀个RPC框架,⾃⼰实现RPC框架如果并⾮是对性能的极致追求,属实是没有必要的(⽽且个⼈实现的效果估计很难达到已有的成熟实现的⽔平),⽽已经有的RPC框架可以使⽤:Dubbo或者Thrift等成熟的实现。
2.对消息的存储(存储⼦系统的选择)为了满⾜我们错峰/流控/最终可达等⼀系列需求,把消息存储下来,然后选择时机投递就是我们broker的意义。
⽽这个存储⼜可以分为持久化和⾮持久化两种⽅式,持久化能更⼤程度地保证消息的可靠性(不易失)并且⼀般情况下可⽤存储空间都⽐较⼤(外存显然会⽐内存的存储容量⼤)。
但是很多消息对于投递性能的要求⼤于可靠性的要求,且数量极⼤(如⽇志)。
此时我们也可⽤⾮持久化的⽅式,将其缓存在内存,然后进⾏投递。
rocketmq工作流程RocketMQ是一个基于Java编写的开源消息队列系统。
它是由阿里巴巴集团开发的,主要用于可靠的异步通信、高扩展性以及分布式架构。
RocketMQ工作流程RocketMQ主要分为Producer范畴和Consumer范畴,Producer负责消息发送,而Consumer负责接收消息。
下面将详细介绍RocketMQ 的工作流程:1、Producer将消息发送到BrokerProducer将消息发送到RocketMQ集群。
RocketMQ集群中所有的Broker节点都可以接收消息,然后分发给对应的Consumer。
2、Broker存储消息Broker节点收到Producer发送的消息后,会存储到本地硬盘消息存储区中。
在存储消息的时候,会根据消息的主题(Topic)和标签(Tag)进行分类和存储。
3、Consumer订阅消息Consumer通过指定主题和标签来订阅消息。
当有新消息到达时,Broker会根据订阅关系将消息推送给对应的Consumer。
4、Broker将消息推送给Consumer当Broker将消息推送给Consumer时,会根据Consumer拉取消息的类型来选择消息推送方式:- Push模式:Broker直接将消息推送到Consumer客户端,由客户端来接收和处理消息。
- Pull模式:Broker通知Consumer有新消息,等待Consumer客户端主动来拉取消息。
由客户端主动拉取和处理消息。
5、Consumer消费消息Consumer接收到消息后,会进行相应的处理。
处理结束后,Consumer向Broker发送确认消息,表示该消息已经被处理完毕。
6、Broker删除确认消息Broker收到确认消息后,会将该消息从本地存储区删除。
这样,下次推送消息的时候,就不会再推送已经被处理完的消息了。
总结一下,RocketMQ的工作流程就是Producer发送消息到Broker,Broker存储消息,Consumer订阅消息,Broker将消息推送给Consumer,Consumer消费消息,Broker删除确认消息。
关于消息队列的使⽤⽅法(RocketMQ)⼀、消息队列概述消息队列中间件是分布式系统中重要的组件,主要解决应⽤解耦,异步消息,流量削锋等问题,实现⾼性能,⾼可⽤,可伸缩和最终⼀致性架构。
⽬前使⽤较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ⼆、消息队列应⽤场景以下介绍消息队列在实际应⽤中常⽤的使⽤场景。
异步处理,应⽤解耦,流量削锋和消息通讯四个场景。
2.1异步处理场景说明:⽤户注册后,需要发注册邮件和注册短信。
传统的做法有两种 1.串⾏的⽅式;2.并⾏⽅式a、串⾏⽅式:将注册信息写⼊数据库成功后,发送注册邮件,再发送注册短信。
以上三个任务全部完成后,返回给客户端。
b、并⾏⽅式:将注册信息写⼊数据库成功后,发送注册邮件的同时,发送注册短信。
以上三个任务完成后,返回给客户端。
与串⾏的差别是,并⾏的⽅式可以提⾼处理的时间假设三个业务节点每个使⽤50毫秒钟,不考虑⽹络等其他开销,则串⾏⽅式的时间是150毫秒,并⾏的时间可能是100毫秒。
因为CPU在单位时间内处理的请求数是⼀定的,假设CPU1秒内吞吐量是100次。
则串⾏⽅式1秒内CPU可处理的请求量是7次(1000/150)。
并⾏⽅式处理的请求量是10次(1000/100)⼩结:如以上案例描述,传统的⽅式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。
如何解决这个问题呢?引⼊消息队列,将不是必须的业务逻辑,异步处理。
改造后的架构如下:按照以上约定,⽤户的响应时间相当于是注册信息写⼊数据库的时间,也就是50毫秒。
注册邮件,发送短信写⼊消息队列后,直接返回,因此写⼊消息队列的速度很快,基本可以忽略,因此⽤户的响应时间可能是50毫秒。
因此架构改变后,系统的吞吐量提⾼到每秒20 QPS。
⽐串⾏提⾼了3倍,⽐并⾏提⾼了两倍。
2.2应⽤解耦场景说明:⽤户下单后,订单系统需要通知库存系统。
传统的做法是,订单系统调⽤库存系统的接⼝。
可靠消息最终⼀致性【本地消息表、RocketMQ事务消息⽅案】⼀、可靠消息最终⼀致性事务概述事务发起⽅(消息⽣产⽅)将消息发给消息中间件,事务参与⽅从消息中间件接收消息,事务参与⽅(消息消费⽅)和消息中间件之间都是通过⽹络通信,由于⽹络通信的不确定性会导致分布式事务问题。
因此可靠消息最终⼀致性⽅案要解决以下⼏个问题:【1】本地事务与消息发送的原⼦性问题:事务发起⽅在本地事务执⾏成功后消息必须发出去,否则就丢弃消息。
即实现本地事务和消息发送的原⼦性,要么都成功,要么都失败。
本地事务与消息发送的原⼦性问题是实现可靠消息最终⼀致性⽅案的关键问题。
先来尝试下这种操作,先发送消息,再操作数据库:这种情况下⽆法保证数据库操作与发送消息的⼀致性,因为可能发送消息成功,据库操作失败。
1 begin transaction;2//1.发送MQ3//2.数据库操作4 commit transation;第⼆种⽅案,先进⾏数据库操作,再发送消息:这种情况下貌似没有问题,如果发送 MQ消息失败,就会抛出异常,导致数据库事务回滚。
但如果是超时异常,数据库回滚,但 MQ其实已经正常发送了,同样会导致不⼀致。
1 begin transaction;2//1.数据库操作3//2.发送MQ4 commit transation;【2】事务参与⽅接收消息的可靠性:事务参与⽅必须能够从消息队列接收到消息,如果接收消息失败可以重复接收消息。
【3】消息重复消费的问题:由于步骤2的存在,若某⼀个消费节点超时但是消费成功,此时消息中间件会重复投递此消息,就导致了消息的重复消费。
要解决消息重复消费的问题就要实现事务参与⽅的⽅法幂等性。
⼆、解决⽅案【本地消息表⽅案】1 begin transaction;2//1.新增⽤户3//2.存储积分消息⽇志4 commit transation;【2】定时任务扫描⽇志:如何保证将消息发送给消息队列呢?经过第⼀步消息已经写到消息⽇志表中,可以启动独⽴的线程,定时对消息⽇志表中的消息进⾏扫描并发送⾄消息中间件,在消息中间件反馈发送成功后删除该消息⽇志,否则等待定时任务下⼀周期重试。
RocketMQ(11)消息重试机制和死信队列七、消息发送重试机制1 说明Producer对发送失败的消息进⾏重新发送的机制,称为消息发送重试机制,也称为消息重投机制。
对于消息重投,需要注意以下⼏点:⽣产者在发送消息时,若采⽤同步或异步发送⽅式,发送失败会重试,但oneway消息发送⽅式发送失败是没有重试机制的只有普通消息有发送重试机制,顺序消息是没有的(只有默认⾃带的发送选择才有这个功能,若⼿动实现选择器,则⽆法实现重试避错机制,也不需要)消息重投机制可以保证消息尽可能发送成功、不丢失,但可能会造成消息重复。
消息重复在 RocketMQ中是⽆法避免的问题消息重复在⼀般情况下不会发⽣,当出现消息量⼤、⽹络抖动,消息重复就会成为⼤概率事件producer主动重发、consumer负载变化(发⽣Rebalance,不会导致消息重复,但可能出现重复消费)也会导致重复消息消息重复⽆法避免,但要避免消息的重复消费。
避免消息重复消费的解决⽅案是,为消息添加唯⼀标识(例如消息key),使消费者对消息进⾏消费判断来避免重复消费消息发送重试有三种策略可以选择:同步发送失败策略、异步发送失败策略、消息刷盘失败策略2 同步发送失败策略对于普通消息,消息发送默认采⽤round-robin策略来选择所发送到的队列。
如果发送失败,默认重试2 次。
但在重试时是不会选择上次发送失败的Broker,⽽是选择其它Broker。
当然,若只有⼀个Broker其也只能发送到该Broker,但其会尽量发送到该Broker上的其它Queue。
// 创建⼀个producer,参数为Producer Group名称DefaultMQProducer producer = new DefaultMQProducer("pg");// 指定nameServer地址producer.setNamesrvAddr("rocketmqOS:9876");// 设置同步发送失败时重试发送的次数,默认为2次producer.setRetryTimesWhenSendFailed(3);// 设置发送超时时限为5s,默认3sproducer.setSendMsgTimeout(5000);同时,Broker还具有失败隔离功能,使Producer尽量选择未发⽣过发送失败的Broker作为⽬标 Broker。
rocketmq实现延迟队列(精确到秒级)开源版本中,只有RocketMQ⽀持延迟消息,且只⽀持18个特定级别的延迟付费版本中,阿⾥云和腾讯云上的MQ产品都⽀持精度为秒级别的延迟消息定时消息:Producer将消息发送到消息队列RocketMQ版服务端,但并不期望⽴马投递这条消息,⽽是推迟到在当前时间点之后的某⼀个时间投递到Consumer进⾏消费,该消息即定时消息。
延时消息:Producer将消息发送到消息队列RocketMQ版服务端,但并不期望⽴马投递这条消息,⽽是延迟⼀定时间后才投递到Consumer进⾏消费,该消息即延时消息。
定时消息与延时消息在代码配置上存在⼀些差异,但是最终达到的效果相同:消息在发送到消息队列RocketMQ版服务端后并不会⽴马投递,⽽是根据消息中的属性延迟固定时间后才投递给消费者。
实现原理(4种实现⽅案)1.代理实现2.时间轮和delay-commit-log实现3.时间轮和时间file实现4.基于rocketmq 18个等级来改造适⽤场景定时消息和延时消息适⽤于以下⼀些场景:消息⽣产和消费有时间窗⼝要求,例如在电商交易中超时未⽀付关闭订单的场景,在订单创建时会发送⼀条延时消息。
这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成⽀付。
如⽀付未完成,则关闭订单。
如已完成⽀付则忽略。
通过消息触发⼀些定时任务,例如在某⼀固定时间点向⽤户发送提醒消息。
使⽤⽅式定时消息和延时消息的使⽤在代码编写上存在略微的区别:发送定时消息需要明确指定消息发送时间点之后的某⼀时间点作为消息投递的时间点。
发送延时消息时需要设定⼀个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。
注意事项定时消息的精度会有1s~2s的延迟误差。
定时和延时消息的msg.setStartDeliverTime参数需要设置成当前时间戳之后的某个时刻(单位毫秒)。
如果被设置成当前时间戳之前的某个时刻,消息将⽴刻投递给消费者。
rocketmq 源码执行流程RocketMQ 是一款高性能、高可靠性的分布式消息中间件,广泛应用于大数据、云计算、金融等领域。
以下是RocketMQ 的源码执行流程,主要涉及生产者(Producer)、消息队列(Message Queue)、消息存储(Message Store)、消费者(Consumer)、代理服务器(Broker)、服务器端(Server)和消费组(Consumer Group)等方面。
1. 生产者(Producer)生产者在RocketMQ 中负责发送消息到指定的主题(Topic)。
当生产者发送消息时,会首先根据指定的主题找到相应的Broker,然后通过Broker 将消息发送到服务器端。
在源码中,生产者使用长轮询机制向Broker 发送消息,并重试机制确保消息发送成功。
2. 消息队列(Message Queue)消息队列是RocketMQ 中用于存储消息的组件,每个主题都对应多个消息队列。
生产者发送的消息首先被存储在消息队列中,等待被消费者消费。
在源码中,消息队列采用先进先出(FIFO)的方式存储消息,确保先发送的消息先被消费。
3. 消息存储(Message Store)消息存储是RocketMQ 中用于持久化存储消息的组件。
RocketMQ 支持将消息存储在内存或磁盘上,以满足不同的性能和可靠性需求。
在源码中,消息存储采用分段式存储结构,将大表划分为小表,以提高读写性能。
此外,还采用了多种数据结构如二叉树、哈希表等,以实现高效的索引和查询。
4. 消费者(Consumer)消费者在RocketMQ 中负责接收并消费消息。
消费者通过订阅主题来接收消息,并可以采用拉取或推送的模式从Broker 获取消息。
在源码中,消费者与Broker 建立长连接,通过定时拉取或Broker 推送的方式获取消息。
消费者还支持集群消费和广播消费两种模式,以满足不同的业务需求。
5. 代理服务器(Broker)Broker 是RocketMQ 中的一种服务端组件,负责管理主题和消息队列,提供生产者和消费者之间的通信服务。
一种rocketmq优先级队列的实现方法和装置一、背景RocketMQ是一款由阿里巴巴开源的分布式消息队列系统,广泛应用于实时数据流处理。
然而,现有的RocketMQ系统并未提供对优先级队列的支持。
在实际应用中,有时我们需要根据消息的重要程度或紧急程度对消息进行不同的处理,因此,实现一个具有优先级队列功能的RocketMQ系统具有重要的现实意义。
二、技术实现1. 定义优先级类:首先,我们需要定义一个优先级类,该类包含一个整数值,用于表示消息的优先级。
优先级高的消息将得到优先处理。
2. 消息插入:在RocketMQ的生产者端,当一条新消息需要被插入到队列中时,首先需要将其优先级值封装在消息体中。
然后,将消息发送到RocketMQ的存储层。
3. 优先级过滤:在RocketMQ的消费者端,当消费者启动时,需要设置一个优先级过滤器。
该过滤器根据消费者的优先级设定和接收到的消息的优先级进行比较,只处理优先级高的消息。
4. 优先级调度:在消费者处理优先级高的消息时,可以考虑使用多线程或多进程技术进行并行处理,以提高处理效率。
同时,可以考虑引入延迟队列机制,当一个线程在处理高优先级消息时出现阻塞,可以将其切换到其他线程进行处理。
5. 优先级反馈:为了实现动态调整优先级的机制,可以在消费者端引入优先级反馈机制。
当消费者处理完一条消息后,可以根据反馈结果(如处理时间、处理成功率等)来调整该消费者对不同优先级消息的分配比例。
三、装置实现1. 硬件设施:包括一个RocketMQ存储层,用于存储消息;一个多线程或多进程消费者处理单元,用于接收并处理优先级高的消息;一个优先级反馈模块,用于收集和处理消费者反馈的信息。
2. 软件设施:包括一个生产者端程序,用于将新消息发送到RocketMQ存储层;一个消费者端程序,包括多个线程或进程,用于接收并处理优先级高的消息;一个优先级过滤器,用于根据消费者的优先级设定和接收到的消息的优先级进行比较,只处理优先级高的消息;一个反馈模块,用于收集和处理消费者反馈的信息;一个控制系统,用于动态调整生产者和消费者之间的通信参数和优先级分配比例。
rocketmq延迟队列实现方式摘要:1.RocketMQ 概述2.延迟队列的实现方式3.延迟队列的应用场景4.延迟队列的优缺点正文:1.RocketMQ 概述RocketMQ 是阿里巴巴开源的一款分布式消息中间件,具有高性能、高可靠、高扩展性等特点。
它采用分布式架构,支持大规模消息队列和大量并发生产者/消费者。
RocketMQ 提供了持久化存储、高可用、高容错等功能,保证了消息的可靠传输。
2.延迟队列的实现方式在RocketMQ 中,延迟队列的实现方式主要有两种:(1)基于时间轮询的方式:时间轮询的方式是利用时间戳来实现消息的延迟发送。
生产者将消息发送到指定的队列,并设置消息的延迟时间。
在消息处理过程中,消费者根据消息的延迟时间进行消费。
当消息的延迟时间到达时,消费者将消息从队列中取出并进行处理。
(2)基于事务的方式:事务的方式是利用数据库的事务功能来实现消息的延迟发送。
生产者将消息发送到指定的队列,并将消息的延迟时间存储在数据库中。
消费者从数据库中获取消息的延迟时间,并根据延迟时间进行消费。
当消息的延迟时间到达时,消费者将消息从队列中取出并进行处理。
3.延迟队列的应用场景延迟队列在实际应用中有很多场景,主要包括:(1)异步处理:在高性能系统中,为了避免阻塞和等待,可以使用延迟队列将一些耗时的任务放到延迟队列中,让消费者异步处理。
(2)消息处理:在系统中,有些消息需要定时发送或定时处理,可以使用延迟队列来实现。
(3)数据统计:在数据分析和统计系统中,有些数据需要定时统计和汇总,可以使用延迟队列来实现。
4.延迟队列的优缺点延迟队列的优点主要有:(1)降低系统复杂度:通过将耗时任务放到延迟队列中,可以简化系统逻辑,降低系统复杂度。
(2)提高系统性能:使用延迟队列可以避免阻塞和等待,提高系统的性能。
延迟队列的缺点主要有:(1)可靠性问题:由于延迟队列涉及到数据的持久化和存储,可能会出现数据丢失或损坏的问题。
阿里云的rocketmq用法
阿里云的RocketMQ是一款高可用、高性能、分布式的消息队列服务。
它能够在不同的云计算环境下进行消息传递,支持多语言的SDK 开发,便于应用程序进行消息交互。
下面介绍其用法方法:
1. 创建Topic:在RocketMQ中,一个Topic表示一个消息主题,可用于区分不同类型的消息。
首先,需要在控制台创建Topic,在高可用性考虑下,需要将消息主题的复制数量设置2或以上。
2. 生产者发送消息:消息生产者程序通过调用SDK发送消息到指定的主题下。
消息发送成功后,会返回消息ID。
3. 消息消费者:消息消费者程序需要订阅指定主题,才能够消费该主题下的消息。
在RocketMQ中,可实现监听机制,即当有消息来到时,消费者程序自动被调用,进行消息处理。
同时,RocketMQ支持消息定时功能和多线程消费,方便用户自定义消费方式。
4. 管理消息队列与负载均衡:RocketMQ支持控制台管理消息队列,实现动态增加或减少队列数以及消息负载均衡,确保消息系统始终高可用。
总之,RocketMQ可广泛应用于电商订单、物流信息、金融行业消息等领域,能够高效地进行消息传递与处理。
rocketmq消息处理流程下载温馨提示:该文档是我店铺精心编制而成,希望大家下载以后,能够帮助大家解决实际的问题。
文档下载后可定制随意修改,请根据实际需要进行相应的调整和使用,谢谢!并且,本店铺为大家提供各种各样类型的实用资料,如教育随笔、日记赏析、句子摘抄、古诗大全、经典美文、话题作文、工作总结、词语解析、文案摘录、其他资料等等,如想了解不同资料格式和写法,敬请关注!Download tips: This document is carefully compiled by theeditor. I hope that after you download them,they can help yousolve practical problems. The document can be customized andmodified after downloading,please adjust and use it according toactual needs, thank you!In addition, our shop provides you with various types ofpractical materials,such as educational essays, diaryappreciation,sentence excerpts,ancient poems,classic articles,topic composition,work summary,word parsing,copy excerpts,other materials and so on,want to know different data formats andwriting methods,please pay attention!RocketMQ 消息处理流程。
1. 生产者发送消息。
rocketmq使用示例
一、引言
在当今分布式系统中,消息队列发挥着越来越重要的作用。
RocketMQ作为阿里巴巴开源的一款高性能、高可靠、高可用的消息中间件,受到了广泛关注。
本文将通过一系列使用示例,为大家介绍RocketMQ的基本使用方法、集群与扩展、性能调优、安全与监控等方面的内容,帮助大家更好地应用RocketMQ。
二、RocketMQ简介
1.产品背景
随着业务的发展,系统间的通信变得越来越复杂。
传统的同步调用方式难以满足高并发、低延迟的需求。
因此,消息队列应运而生。
RocketMQ作为阿里巴巴内部使用的一款高性能消息队列,经过多年优化,具备较高的性能和稳定性。
2.产品特点
RocketMQ具有以下特点:
1) 高性能:RocketMQ采用分布式架构,具有良好的横向扩展能力,可支持大规模消息队列场景。
2) 高可用:RocketMQ支持数据持久化,保证数据不丢失;同时,支持多副本机制,提高系统抗故障能力。
3) 高可靠性:RocketMQ采用分布式事务确保消息的一致性,满足企业级应用需求。
4) 丰富的消息类型:支持普通消息、顺序消息、消息标签等多种消息类型,满足不同场景需求。
一、rocketmq延迟队列概述rocketmq是一个开源的分布式消息中间件,具有高性能、高可靠、低延迟等特点。
延迟消息是指消息发送后,并不是立即被用户接收,而是在一定的时间延迟后才能被用户接收。
rocketmq提供了延迟队列的功能,可以方便地实现延迟消息的发送和消费。
二、rocketmq延迟队列的应用场景1. 订单超时提醒:在电商评台中,用户下单后需要在一定时间内完成支付,可以使用延迟队列来发送订单超时提醒消息,在订单超时时触发提醒操作。
2. 任务调度:在分布式任务调度系统中,可以使用延迟队列来发送定时任务消息,实现任务的调度和执行。
3. 实时监控:在监控系统中,可以使用延迟队列来发送告警消息,在一定时间内未恢复时触发告警操作。
4. 定时消息推送:在社交应用中,可以使用延迟队列来发送定时消息推送,在指定时间触发消息推送操作。
三、rocketmq延迟队列的实现方式rocketmq延迟队列的实现方式主要是通过设置消息的延迟级别来实现的。
延迟级别是通过设置消息的属性来指定消息的延迟时间,rocketmq提供了多个预定义的延迟级别,也可以自定义延迟级别。
使用延迟级别发送的消息将在指定的延迟时间后变为可消费状态,用户可以接收并处理这些延迟消息。
四、rocketmq延迟队列的使用方法1. 发送延迟消息通过设置消息的延迟级别,将消息发送到rocketmq的延迟队列中。
在发送消息时,需要指定消息的延迟级别,rocketmq会根据延迟级别的设置将消息存入对应的延迟队列中。
2. 消费延迟消息用户需要订阅延迟队列中的消息,根据消息的延迟级别来消费延迟消息。
用户在接收到延迟消息后,可以根据业务逻辑进行处理。
3. 配置延迟级别在rocketmq的配置文件中,可以设置预定义的延迟级别和对应的延迟时间。
也可以通过代码自定义延迟级别和延迟时间,灵活控制消息的延迟发送和消费。
五、rocketmq延迟队列的优缺点1. 优点(1) 实现简单:通过设置消息的延迟级别,即可实现延迟消息的发送和消费。
rocketmq使用示例以下是一个使用RocketMQ的简单示例:1. 引入RocketMQ客户端库依赖:```xml<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.1</version></dependency>```2. 发送消息到消息队列:```javaimport org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.client.producer.SendStatus;import mon.message.Message;public class RocketMQProducer {public static void main(String[] args) throws MQClientException {// 实例化生产者对象DefaultMQProducer producer = new DefaultMQProducer("example_group");// 设置NameServer地址producer.setNamesrvAddr("127.0.0.1:9876");// 启动生产者producer.start();try {// 创建消息对象Message message = new Message("example_topic", "TagA", "Hello RocketMQ".getBytes());// 发送消息,并获取发送结果SendResult result = producer.send(message);// 判断消息是否发送成功if (result.getSendStatus() == SendStatus.SEND_OK) { System.out.println("消息发送成功");} else {System.out.println("消息发送失败");}} catch (Exception e) {e.printStackTrace();}// 关闭生产者producer.shutdown();}}```3. 从消息队列接收消息:```javaimportorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer; importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentl yContext;importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentl yStatus;importorg.apache.rocketmq.client.consumer.listener.MessageListenerCon currently;import mon.message.MessageExt; import java.util.List;public class RocketMQConsumer {public static void main(String[] args) throws Exception {// 实例化消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group");// 设置NameServer地址consumer.setNamesrvAddr("127.0.0.1:9876");// 设置订阅的主题和标签consumer.subscribe("example_topic", "*");// 注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {// 处理接收到的消息for (MessageExt msg : msgs) {System.out.println("接收到消息: " + newString(msg.getBody()));}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();System.out.println("消费者已启动");}}```这是一个简单的RocketMQ使用示例,其中生产者将消息发送到指定的主题和标签,而消费者订阅该主题和标签,并处理接收到的消息。
rocketmq消息及流程1、为什么⽤mq优势主要有3个:应⽤解耦(降低微服务之间的关联)、异步提速(微服务拿到mq消息后同时⼯作)、削峰填⾕(可以消息堆积)劣势系统可⽤性降低(MQ⼀旦宕机整个系统不可⽤)复杂度提⾼(需要解决系统消息⼀致性、重复消费...)⼀致性问题(不同系统拿到mq中的消息后,部分系统处理失败怎么办)2、rocketmq集群⼯作流程由上图可以看出,rocketMQ集群=消息服务器集群+命名服务器集群,其中消息服务器集群=⽣产者集群+broker集群+消费者集群。
命名服务器集群(nameserver cluster)●命名服务器集群是管理⽣产者、broker、消费者的纽带,哪个⽣产者/broker/消费者可⽤都是通过命名服务器得知其信息,所以⽣产者/broker/消费者都需要定时发送⼼跳给命名服务器●命名服务器与⽣产者的关系:命名服务器记录有许多broker的ip地址,每个⽣产者发送消息到broker前都需要先去命名服务器获取某个broker的ip,然后再发送消息到broker ●命名服务器和消息者的关系:命名服务器记录有许多broker的ip地址,消费者想监听broker中的消息,需要先去命名服务器获取某个broker的ip,然后再监听broker中的消息⽣产者集群(producer cluster)●每个⽣产者部署在不同的IP上形成了集群●⽣产者的消息=topic+tag,topic⽤来区分消息类型,⼀种topic类型的消息可以分布在多个不同的broker中,同类型的消息就⽤tag区分,如我们系统⾥的佣⾦宝的topic是"topic-yjb",然后佣⾦宝下⾯可以划分多个tag消费者集群(consumer cluster)●每个消费者部署在不同的IP上形成了集群●消费者获取某个broker中的消息理论上有两种⽅法:○ pull拉取模式:消费者开启线程定时访问broker,如有消息存在则拉取,缺点是太消耗消费者的资源了,不管有没有消息都会去访问broker○ push推送模式:消费者起⼀个监听器监听broker(与broker建⽴⼀个长链接),若broker中有消息,则broker会⾃动推送消息给消费者,⼀般⽤这种。
RocketMq消息队列实施方案-完整版消息队列实施方案1、背景异步解耦合、给前端系统提供最高效的反应2、常见消息队列对比2、1 ActiveMqActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的JMS Provider实现优点:Java语言支持集群模式缺点:性能在消息中间件中处于下游2、2 RabbitmqRabbitmq是基于AMQP使用erlang语言实现的消息队列系统优点:1、完整的消息队列系统,支持多种消息队列模式,包括竞争消费;2、支持集群模式,扩展集群容量和性能比较方便,集成了集群的监控和管理;3、支持消息的持久化;缺点:1、需要学习比较复杂的接口和协议,比较耗费时间;2、性能不是特别理想大概在1wqps左右;3、使用Erlang语言,语言基础;2、3 KafkaKafka 是LinkedIn 开发的一个高性能、分布式的消息发布订阅系统。
优点:1、分布式集群可以透明的扩展,增加新的服务器进集群。
2、高性能。
单机写入TPS约在百万条/秒3、容错。
数据都会复制到几台服务器上。
缺点:1、复杂性。
Kafka需要zookeeper 集群的支持,Topic通常需要人工来创建,部署和维护较一般消息队列成本更高定位于日志传输、存在消息丢失的肯能、消息乱序3、消息发送错误无重试2、4 RocketMQRockerMq 是阿里公司中间件团队参考Kafka 思想,用Java语言实现的消息传输系统优点:1、较高性能,单机写入TPS单实例约7万条/秒2、容错,多种集群模式、可以解决容错问题3、消息重试发送4、顺序消息可以严格执行缺点:1、消息重复、消费端需要做去重操作2、5 选用结论从项目业务与团队技术偏向考虑,我们应该需要一种数据安全性比较高,保证每个消息都会被执行;有容错机制、支持集群模式高可用;性能不错,可以在毫秒级处理消息;支持顺序消息的消息中间件,RockerMq 可以满足这些要求。
都写成功,才返回成功。
优点:数据与服务都无单点,Master 宕机情冴下,消费者可以从slave消费、消息无延迟,服务可用性与数据可用性都非常高缺点:1、性能比异步复制模式略低,収送单个消息的RT(返回时间)会略高。
2、目前master宕机后,备机不能自动切换为主机。
只有master可以接收消息,若所有master宕机,将不能接收消息选用结论由于我们需要保证消息中间件的高可用性,消息不丢失、消息无延迟,所以我们选择“多Master 多Slave模式,同步双写”模式。
并且选择同步刷盘。
3、2、2 多Master多Slave模式多master多slave模式网络结构图主要组件有:Name Server、Broker、Producer、Consumer1、Name Server是一个几乎无状态节点,可集群部署,节点之间无信息同步、记录Topic 路由信息。
2、Broker分为Master和Slave,一个Master 可以对应多个Slave,但是一个Slave只能对应一个Master。
3、Producer与Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从Name Server获取Topic 路由信息,并向提供Topic 服务的Master 建立长连接,定时向Master 发送心跳。
Producer只可以向Master发送消息。
Producer 完全无状态,可集群部署。
4、Consumer与Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从Name Server获取Topic路由信息,并与提供Topic 服务的Master、Slave建立长连接,并定时向Master、Slave収送心跳。
Consumer既可以从Master 订阅消息,也可以从Slave 订阅消息,订阅规则由Broker配置决定3、3 集群搭建linux 环境下部署rocketMq多master多slave 模式、同步双写模式集群,暂定为2个master,2个slave3、3、1 安装条件4台linux服务器、分为master-a、slave-a ; master-b、slave-b服务器防火墙开启9876,10911lokkit -p 9876:tcp -p 10911:tcp服务器支持wget命令服务器安装jdk,不低于使用的rocketMq的支持版本3、3、2 安装步骤4台linux服务器、分为master-a、slave-a ; master-b、slave-b假设ip分别为:master-a =10.1.236.1slave–a =10.1.236.2master-b =10.1.236.3slave-b =10.1.236.43、3、2、1 master-a1 从github下载RocketMQ安装包或源码自编译安装wgethttps:///alibaba/RocketMQ/releases/download/v3.2.6/alibaba-rocketmq-3.2.6.tar.gz 2 解压缩、并创建数据、日志目录tar –xvf alibaba-rocketmq-3.2.2.tar.gz3 配置环境变量:系统变量:Vi /etc/profile或者修改当前用户的环境变量例如:export ROCKETMQ_HOME=/opt/RocketMQ/alibaba-rocketmqexport PATH=${PATH}:${ROCKETMQ_HOME}/binsource 命令是环境变量生效4 修改mq集群的master-a 配置修改文件$ROCKETMQ_HOME/conf/2m-2s-sync/brok er-a.properties不是强制必须使用这个文件,使用者可以自行定义# brokerClusterName=DefaultClusterbrokerName=broker-a #归属master-slave组的名字brokerId=0 #0表示为master-slave组中为 masternamesrvAddr=10.1.236.1:9876;10.1.236.2:9876;10.1.236.3:9876;10.1.236.4:9876 #nameserv defaultTopicQueueNums=4autoCreateTopicEnable=trueautoCreateSubscriptionGroup=truelistenPort=10911 #Broker 对外服务的监听端口deleteWhen=04fileReservedTime=120mapedFileSizeCommitLog=1073741824mapedFileSizeConsumeQueue=50000000destroyMapedFileIntervalForcibly=120000redeleteHangedFileInterval=120000diskMaxUsedSpaceRatio=88storePathRootDir=/opt/RocketMQ/alibaba-rocketmq/data #数据目录storePathCommitLog=/opt/RocketMQ/alibaba-rocketmq/logs #日志目录maxMessageSize=65536flushCommitLogLeastPages=4flushConsumeQueueLeastPages=2flushCommitLogThoroughInterval=10000flushConsumeQueueThoroughInterval=60000checkTransactionMessageEnable=falsesendMessageThreadPoolNums=128pullMessageThreadPoolNums=128brokerRole=SYNC_MASTER #角色同步双写MasterflushDiskType=SYNC_FLUSH #同步刷盘brokerIP1=10.1.236.1 #本机IP地址,多网卡易出错,请手工指定其他配置请参考《RocketMQ 开发指南》,最新版针对v3.2.45 启动mq集群的master-a跳转到RocketMQ的bin目录下>cd $ROCKETMQ_HOME/bin>nohup sh mqnamesrv &>nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.p roperties &3、3、2、2 slave-a1从github下载RocketMQ安装包或源码自编译安装wgethttps:///alibaba/RocketMQ/releases/download/v3.2.2/alibaba-rocketmq-3.2.2.tar.gz2 解压缩、并创建数据、日志目录tar –xvf alibaba-rocketmq-3.2.2.tar.gz3 配置环境变量例如:export ROCKETMQ_HOME=/opt/RocketMQ/alibaba-rocketmqexport PATH=${PATH}:${ROCKETMQ_HOME}/binsource 命令是环境变量生效4修改mq集群的slave-a 配置修改文件$ROCKETMQ_HOME/conf/2m-2s-sync/brok er-a-s.properties不是强制必须使用这个文件,使用者可以自行定义、只要保证配置文件内的brokerName 正确即可# brokerClusterName=DefaultClusterbrokerName=broker-a #归属master-slave组的名字brokerId=1 #1表示在master-slave组中为slavenamesrvAddr=10.1.236.1:9876;10.1.236.2:9876;10.1.236.3:9876;10.1.236.4:9876 defaultTopicQueueNums=4autoCreateTopicEnable=trueautoCreateSubscriptionGroup=truelistenPort=10911 #对外端口deleteWhen=04fileReservedTime=120mapedFileSizeCommitLog=1073741824mapedFileSizeConsumeQueue=50000000destroyMapedFileIntervalForcibly=120000redeleteHangedFileInterval=120000diskMaxUsedSpaceRatio=88storePathRootDir=/aifs01/users/tstusr12/opt/RocketMQ/alibaba-rocketmq/data #数据存放storePathCommitLog=/aifs01/users/tstusr12/opt/RocketMQ/alibaba-rocketmq/logs #日志存放maxMessageSize=65536flushCommitLogLeastPages=4flushConsumeQueueLeastPages=2flushCommitLogThoroughInterval=10000flushConsumeQueueThoroughInterval=60000checkTransactionMessageEnable=falsesendMessageThreadPoolNums=128pullMessageThreadPoolNums=128brokerRole=SLAVE #角色 SlaveflushDiskType=SYNC_FLUSH # 同步刷盘brokerIP1=10.1.236.2 #本机ip,多网卡,建议自定义其他配置请参考《RocketMQ 开发指南》,最新版针对v3.2.45 启动mq集群的slave-a跳转到RocketMQ的bin目录下>cd $ROCKETMQ_HOME/bin>nohup sh mqnamesrv &>nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s .properties &3、3、2、3 master-b1 从github下载RocketMQ安装包或源码自编译安装wgethttps:///alibaba/RocketMQ/releases/download/v3.2.2/alibaba-rocketmq-3.2.2.tar.gz2 解压缩、并创建数据、日志目录tar –xvf alibaba-rocketmq-3.2.2.tar.gz3 配置环境变量例如:export ROCKETMQ_HOME=/opt/RocketMQ/alibaba-rocketmqexport PATH=${PATH}:${ROCKETMQ_HOME}/binsource 命令是环境变量生效4 修改mq集群的master-b配置修改文件$ROCKETMQ_HOME/conf/2m-2s-sync/brok er-b.properties不是强制必须使用这个文件,使用者可以自行定义# brokerClusterName=DefaultClusterbrokerName=broker-b #归属master-slave组的名字brokerId=0 #0表示为master-slave组中为 masternamesrvAddr=10.1.236.1:9876;10.1.236.2:9876;10.1.236.3:9876;10.1.236.4:9876 #nameserv defaultTopicQueueNums=4autoCreateTopicEnable=trueautoCreateSubscriptionGroup=truelistenPort=10911 #Broker 对外服务的监听端口deleteWhen=04fileReservedTime=120mapedFileSizeCommitLog=1073741824mapedFileSizeConsumeQueue=50000000destroyMapedFileIntervalForcibly=120000redeleteHangedFileInterval=120000diskMaxUsedSpaceRatio=88storePathRootDir=/opt/RocketMQ/alibaba-rocketmq/data #数据目录storePathCommitLog=/opt/RocketMQ/alibaba-rocketmq/logs #日志目录maxMessageSize=65536flushCommitLogLeastPages=4flushConsumeQueueLeastPages=2flushCommitLogThoroughInterval=10000flushConsumeQueueThoroughInterval=60000checkTransactionMessageEnable=falsesendMessageThreadPoolNums=128pullMessageThreadPoolNums=128brokerRole=SYNC_MASTER #角色同步双写MasterflushDiskType=SYNC_FLUSH #同步刷盘brokerIP1=10.1.236.3 #本机IP地址,多网卡易出错,请手工指定其他配置请参考《RocketMQ 开发指南》,最新版针对v3.2.45 启动mq集群的master-b跳转到RocketMQ的bin目录下>cd $ROCKETMQ_HOME/bin>nohup sh mqnamesrv &>nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.p roperties &3、3、2、4 slave-b1从github下载RocketMQ安装包或源码自编译安装wgethttps:///alibaba/RocketMQ/releases/download/v3.2.2/alibaba-rocketmq-3.2.2.tar.gz2 解压缩、并创建数据、日志目录tar –xvf alibaba-rocketmq-3.2.2.tar.gz3 配置环境变量例如:export ROCKETMQ_HOME=/opt/RocketMQ/alibaba-rocketmqexport PATH=${PATH}:${ROCKETMQ_HOME}/binsource 命令是环境变量生效4修改mq集群的slave-a 配置修改文件$ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties不是强制必须使用这个文件,使用者可以自行定义、只要保证配置文件内的brokerName 正确即可# brokerClusterName=DefaultClusterbrokerName=broker-b #归属master-slave组的名字brokerId=1 #1表示在master-slave组中为slavenamesrvAddr=10.1.236.1:9876;10.1.236.2:9876;10.1.236.3:9876;10.1.236.4:9876 defaultTopicQueueNums=4autoCreateTopicEnable=trueautoCreateSubscriptionGroup=truelistenPort=10911 #对外端口deleteWhen=04fileReservedTime=120mapedFileSizeCommitLog=1073741824mapedFileSizeConsumeQueue=50000000destroyMapedFileIntervalForcibly=120000redeleteHangedFileInterval=120000diskMaxUsedSpaceRatio=88storePathRootDir=/aifs01/users/tstusr12/opt/RocketMQ/alibaba-rocketmq/data #数据存放storePathCommitLog=/aifs01/users/tstusr12/opt/RocketMQ/alibaba-rocketmq/logs #日志存放maxMessageSize=65536flushCommitLogLeastPages=4flushConsumeQueueLeastPages=2flushCommitLogThoroughInterval=10000flushConsumeQueueThoroughInterval=60000checkTransactionMessageEnable=falsesendMessageThreadPoolNums=128pullMessageThreadPoolNums=128brokerRole=SLAVE #角色 SlaveflushDiskType=SYNC_FLUSH # 同步刷盘brokerIP1=10.1.236.4 #本机ip,多网卡,建议自定义其他配置请参考《RocketMQ 开发指南》,最新版针对v3.2.45 启动mq 集群的slave-b跳转到RocketMQ 的bin 目录下>cd $ROCKETMQ_HOME/bin>nohup sh mqnamesrv &>nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s .properties &4、MQ 消息服务接口实现4.1、流程图 消费端应用消息中间件MQ 服务器Http 请求发送消息消息发送至服务器监听消息并拉取消息通过backurl 将消息推送至消费端返回发送结果4.2、消息中间件接口规范此服务接口以dubbo提供的restful协议对外提供发送消息服务,并通过backurl回调消费端把消息推送给消费者,使用此服务可以通过http post请求的方式,消费端要提供接受消息的http协议的post接口。