RabbitMQ的实战应用
- 格式:pptx
- 大小:719.15 KB
- 文档页数:31
python实现RabbitMQ同步跟异步消费模型1,消息推送类1 import pika234# 同步消息推送类5 class RabbitPublisher(object):67# 传⼊RabbitMQ的ip,⽤户名,密码,实例化⼀个管道8 def __init__(self, host, user, password):9 self.host = host10 er = user11 self.password = password12 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, credentials=pika.PlainCredentials(er, self.password)))13 self.channel = self.connection.channel()1415# 发送消息在队列中16 def send(self, queue_name, body):17 self.channel.queue_declare(queue=queue_name, durable=True) # 声明⼀个持久化队列18 self.channel.basic_publish(exchange='',19 routing_key=queue_name, # 队列名字20 body=body, # 消息内容21 properties=pika.BasicProperties(22 delivery_mode=2, # 消息持久化23 ))2425# 清除指定队列的所有的消息26 def purge(self, queue_name):27 self.channel.queue_purge(queue_name)2829# 删除指定队列30 def delete(self, queue_name, if_unused=False, if_empty=False):31 self.channel.queue_delete(queue_name, if_unused=if_unused, if_empty=if_empty)3233# 断开连接34 def stop(self):35 self.connection.close()View Code2.消息消费类(1)同步消息消费在同步消息消费的时候可能会出现pika库断开的情况,原因是因为pika客户端没有及时发送⼼跳,连接就被server端断开了。
基于公有云的RabbitMQ双向数据同步方案一、测试环境阿里云MQ:172.16.130.204(master) / 172.16.128.63(slave)微软云MQ:172.16.192.9(master) / 172.16.192.28(slave)同步组件:RabbitMQ federation-upstream同步方式:双向同步二、方案目标及介绍验证基于不同公有云的数据中心,支持MQ消息队列的业务数据同步方案。
下面配置按先后顺序,分别在阿里云MQ节点和微软云MQ节点进行配置,完成后实现两地消息生产者产生消息会在本地留存一份,并立即转发到对端(各自上游MQ节点),使两地消费者可以及时读取全部消息。
三、阿里云MQ1.创建federation-upstreamURL:amqp://admin:************.192.9(定义上游为微软云香港MQ节点)Reconnect Delay:5sAck mode:on-confirm目的是将上游的微软云香港MQ节点收到的信息,推送到当前MQ节点(下游)。
2.创建exchange3.创建queue4.绑定exchange、queue5.创建policy这条策略是为了在当前阿里云MQ,匹配所有以dual开头的exchange、queue,按federation-upstream定义执行。
策略配置完成即生效,Federation会自动在上游微软云MQ节点完成下列操作:a.创建同名交换器dual.exchangeb.创建federation类型交换器:federation: dual.exchange -> rabbit@iz2zegotcocceakkehlbi3z Bc.创建同名队列dual.queued.创建federation类型队列:federation: dual.exchange -> rabbit@iz2zegotcocceakkehlbi3ze.绑定dual.exchange和federation类型交换器f.绑定federation类型交换器和federation类型队列g.配置federation类型队列的消费者:阿里云MQ节点6.检查连接状态阿里云端MQ的federation中,同步策略运行正常微软云香港MQ的connections中,阿里云MQ服务器已连接成功。
SpringBoot缓存实战Caffeine⽰例Caffeine和Spring Boot集成Caffeine是使⽤Java8对Guava缓存的重写版本,在Spring Boot 2.0中将取代Guava。
如果出现Caffeine,CaffeineCacheManager将会⾃动配置。
使⽤spring.cache.cache-names属性可以在启动时创建缓存,并可以通过以下配置进⾏⾃定义(按顺序):spring.cache.caffeine.spec:定义的特殊缓存com.github.benmanes.caffeine.cache.CaffeineSpec: bean定义com.github.benmanes.caffeine.cache.Caffeine: bean定义例如,以下配置创建⼀个foo和bar缓存,最⼤数量为500,存活时间为10分钟:spring.cache.cache-names=foo,barspring.cache.caffeine.spec=maximumSize=500,expireAfterAccess=600s除此之外,如果定义了com.github.benmanes.caffeine.cache.CacheLoader,它会⾃动关联到CaffeineCacheManager。
由于该CacheLoader将关联被该缓存管理器管理的所有缓存,所以它必须定义为CacheLoader<Object, Object>,⾃动配置将忽略所有泛型类型。
引⼊依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-cache</artifactId></dependency><dependency><groupId>com.github.ben-manes.caffeine</groupId><artifactId>caffeine</artifactId><version>2.6.0</version></dependency>开启缓存的⽀持使⽤@EnableCaching注解让Spring Boot开启对缓存的⽀持@SpringBootApplication@EnableCaching// 开启缓存,需要显⽰的指定public class SpringBootStudentCacheCaffeineApplication {public static void main(String[] args) {SpringApplication.run(SpringBootStudentCacheCaffeineApplication.class, args);}}配置⽂件新增对缓存的特殊配置,如最⼤容量、过期时间等spring.cache.cache-names=peoplespring.cache.caffeine.spec=initialCapacity=50,maximumSize=500,expireAfterWrite=10s,refreshAfterWrite=5s如果使⽤了refreshAfterWrite配置还必须指定⼀个CacheLoader,如:/*** 必须要指定这个Bean,refreshAfterWrite=5s这个配置属性才⽣效** @return*/@Beanpublic CacheLoader<Object, Object> cacheLoader() {CacheLoader<Object, Object> cacheLoader = new CacheLoader<Object, Object>() {@Overridepublic Object load(Object key) throws Exception {return null;}// 重写这个⽅法将oldValue值返回回去,进⽽刷新缓存@Overridepublic Object reload(Object key, Object oldValue) throws Exception {return oldValue;}};return cacheLoader;}Caffeine配置说明:1. initialCapacity=[integer]: 初始的缓存空间⼤⼩2. maximumSize=[long]: 缓存的最⼤条数3. maximumWeight=[long]: 缓存的最⼤权重4. expireAfterAccess=[duration]: 最后⼀次写⼊或访问后经过固定时间过期5. expireAfterWrite=[duration]: 最后⼀次写⼊后经过固定时间过期6. refreshAfterWrite=[duration]: 创建缓存或者最近⼀次更新缓存后经过固定的时间间隔,刷新缓存7. weakKeys: 打开key的弱引⽤8. weakValues:打开value的弱引⽤9. softValues:打开value的软引⽤10. recordStats:开发统计功能注意:1. expireAfterWrite和expireAfterAccess同事存在时,以expireAfterWrite为准。
SpringBoot整合RabbitMQ,简易的队列发送短信实例在这个界⾯⾥⾯我们可以做些什么?可以⼿动创建虚拟host,创建⽤户,分配权限,创建交换机,创建队列等等,还有查看队列消息,消费效率,推送效率等等。
⾸先先介绍⼀个简单的⼀个消息推送到接收的流程,提供⼀个简单的图:黄⾊的圈圈就是我们的消息推送服务,将消息推送到中间⽅框⾥⾯也就是 rabbitMq的服务器,然后经过服务器⾥⾯的交换机、队列等各种关系(后⾯会详细讲)将数据处理⼊列后,最终右边的蓝⾊圈圈消费者获取对应监听的消息。
rabbitMq简单编码 (实例:发送短信)⾸先创建 rabbitmq-provider,pom.xml⾥⽤到的jar依赖:<!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId></dependency><dependency><groupId>com.alibaba.mq-amqp</groupId><artifactId>mq-amqp-client</artifactId><version>1.0.5</version></dependency>application.yml 配置## 配置rabbitMQ 信息rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest# 开启发送确认publisher-confirms: true# 开启发送失败退回publisher-returns: true# 消息 rabbitmq 的⾃定义相关配置rabbit:msg:exchange:fanout:name: msg_fanout_exchangetopic:name: msg_topic_exchangealternate:name: msg_alternate_exchangedead:name: msg_dead_exchangequeue:sms:name: msg.sms.senddead:name: msg.sms.dead.sendupstream:name: msg.sms.upstreamalternate:name: msg.alternateroute:sms: msg.sms.sendupstream: msg.sms.upstream消息 rabbitMq 属性配置配置交换机,并绑定/*** @desc:消息 rabbitMq 属性配置* @author:* @date: 2020/6/24 15:40* @version: 3.0.0* @since: 3.0.0*/@RefreshScope@Componentpublic class RabbitMqMsgProperties {// 扇形交换机名称@Value("${}")private String fanoutExchangeName;// 备份换机名称@Value("${}")private String alternateExchangeName;// TOPIC换机名称@Value("${}")private String topicExchangeName;// 消息死信交换机名称@Value("${}")private String deadExchangeName;// 备份队列名称@Value("${}")private String alternateQueueName;// 短信消息队列名称@Value("${}")private String smsQueueName;// 短信消息死信队列名称@Value("${}")private String smsDeadQueueName;// 邮件消息队列名称@Value("${}")private String emailQueueName;// 邮件消息死信队列名称@Value("${}")private String emailDeadQueueName;// 上⾏消息队列名称@Value("${}")private String upstreamQueueName;// 短信消息路由键@Value("${rabbit.msg.route.sms}")private String smsRouteKey;// 邮件消息路由键@Value("${rabbit.msg.route.email}")private String emailRouteKey;// 上⾏消息路由键@Value("${rabbit.msg.route.upstream}")private String upstreamRouteKey;// 微信消息队列名称@Value("${}")private String wxQueueName;// 微信消息路由键@Value("${rabbit.msg.route.wx}")private String wxRouteKey;// 微信消息死信队列名称@Value("${}")private String wxDeadQueueName;public String getFanoutExchangeName() {return fanoutExchangeName;}public void setFanoutExchangeName(String fanoutExchangeName) { this.fanoutExchangeName = fanoutExchangeName;}public String getSmsQueueName() {return smsQueueName;}public void setSmsQueueName(String smsQueueName) {this.smsQueueName = smsQueueName;}public String getEmailQueueName() {return emailQueueName;}public void setEmailQueueName(String emailQueueName) {this.emailQueueName = emailQueueName;}public String getUpstreamQueueName() {return upstreamQueueName;}public void setUpstreamQueueName(String upstreamQueueName) {this.upstreamQueueName = upstreamQueueName;}public String getTopicExchangeName() {return topicExchangeName;}public void setTopicExchangeName(String topicExchangeName) {this.topicExchangeName = topicExchangeName;}public String getSmsRouteKey() {return smsRouteKey;}public void setSmsRouteKey(String smsRouteKey) {this.smsRouteKey = smsRouteKey;}public String getEmailRouteKey() {return emailRouteKey;}public void setEmailRouteKey(String emailRouteKey) {this.emailRouteKey = emailRouteKey;}public String getUpstreamRouteKey() {return upstreamRouteKey;}public void setUpstreamRouteKey(String upstreamRouteKey) {this.upstreamRouteKey = upstreamRouteKey;}public String getAlternateExchangeName() {return alternateExchangeName;}public void setAlternateExchangeName(String alternateExchangeName) {this.alternateExchangeName = alternateExchangeName;}public String getAlternateQueueName() {return alternateQueueName;}public void setAlternateQueueName(String alternateQueueName) {this.alternateQueueName = alternateQueueName;}public String getSmsDeadQueueName() {return smsDeadQueueName;}public void setSmsDeadQueueName(String smsDeadQueueName) {this.smsDeadQueueName = smsDeadQueueName;}public String getEmailDeadQueueName() {return emailDeadQueueName;}public void setEmailDeadQueueName(String emailDeadQueueName) {this.emailDeadQueueName = emailDeadQueueName;}public String getDeadExchangeName() {return deadExchangeName;}public void setDeadExchangeName(String deadExchangeName) {this.deadExchangeName = deadExchangeName;}public String getWxQueueName() {return wxQueueName;}public void setWxQueueName(String wxQueueName) {this.wxQueueName = wxQueueName;}public String getWxRouteKey() {return wxRouteKey;}public void setWxRouteKey(String wxRouteKey) {this.wxRouteKey = wxRouteKey;}public String getWxDeadQueueName() {return wxDeadQueueName;}public void setWxDeadQueueName(String wxDeadQueueName) {this.wxDeadQueueName = wxDeadQueueName;}}package cn.ygyg.bps.msg.api.conf;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.beans.factory.config.ConfigurableBeanFactory;import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Scope;import javax.annotation.Resource;import java.util.HashMap;import java.util.Map;/*** @desc:消息 rabbitmq 配置类* @author: guanliang.xue* @date: 2020/6/24 15:07* @version: 3.0.0* @since: 3.0.0*/@Configuration@ConditionalOnBean(value = RabbitMqMsgProperties.class)public class RabbitMqMsgConfig {@Resourceprivate RabbitMqMsgProperties rabbitMqMsgProperties;/*** 定义备份交换机* @return 备份交换机*/@Beanpublic FanoutExchange alternateExchange(){return new FanoutExchange(rabbitMqMsgProperties.getAlternateExchangeName());}/*** 定义扇形交换机* @return*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(rabbitMqMsgProperties.getFanoutExchangeName());}/*** 定义TOPIC交换机* @return*/@Beanpublic TopicExchange topicExchange(){Map<String, Object> arguments = new HashMap<>();return new TopicExchange(rabbitMqMsgProperties.getTopicExchangeName(),true,false,arguments); }/*** 死信交换机* @return*/@Beanpublic DirectExchange deadExchange(){return new DirectExchange(rabbitMqMsgProperties.getDeadExchangeName(),true,false);}/*** 备份队列* @return 队列*/@Beanpublic Queue alternateQueue(){return new Queue(rabbitMqMsgProperties.getAlternateQueueName());}/*** 短信队列* @return*/@Beanpublic Queue smsQueue(){Map<String, Object> args = new HashMap<>(2);// x-dead-letter-exchange 这⾥声明当前队列绑定的死信交换机// x-dead-letter-routing-key 死信路由key,默认使⽤原有路由keyargs.put("x-dead-letter-exchange", rabbitMqMsgProperties.getDeadExchangeName());return QueueBuilder.durable(rabbitMqMsgProperties.getSmsQueueName()).withArguments(args).build();}/*** 微信队列* @return*/@Beanpublic Queue wxQueue(){Map<String, Object> args = new HashMap<>(2);// x-dead-letter-exchange 这⾥声明当前队列绑定的死信交换机// x-dead-letter-routing-key 死信路由key,默认使⽤原有路由keyargs.put("x-dead-letter-exchange", rabbitMqMsgProperties.getDeadExchangeName());return QueueBuilder.durable(rabbitMqMsgProperties.getWxQueueName()).withArguments(args).build();}/*** 邮件队列* @return*/@Beanpublic Queue emailQueue(){Map<String, Object> args = new HashMap<>(2);// x-dead-letter-exchange 这⾥声明当前队列绑定的死信交换机// x-dead-letter-routing-key 死信路由key,默认使⽤原有路由keyargs.put("x-dead-letter-exchange", rabbitMqMsgProperties.getDeadExchangeName());return QueueBuilder.durable(rabbitMqMsgProperties.getEmailQueueName()).withArguments(args).build();}/*** sms 死信队列* @return*/@Beanpublic Queue smsDeadQueue(){return new Queue(rabbitMqMsgProperties.getSmsDeadQueueName());}/*** email 死信队列* @return*/@Beanpublic Queue emailDeadQueue(){return new Queue(rabbitMqMsgProperties.getEmailDeadQueueName());}/*** wx 死信队列* @return*/@Beanpublic Queue wxDeadQueue(){return new Queue(rabbitMqMsgProperties.getWxDeadQueueName());}/*** 服务商上⾏队列* @return*/@Beanpublic Queue upstreamQueue(){return new Queue(rabbitMqMsgProperties.getUpstreamQueueName());}/*** 绑定sms死信列到死信交换机* @param queue 死信队列* @return*/@Beanpublic Binding bindSmsDead(@Qualifier("smsDeadQueue") Queue queue,@Qualifier("deadExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with(rabbitMqMsgProperties.getSmsRouteKey());}/*** 绑定email死信列到死信交换机* @param queue 死信队列* @return*/@Beanpublic Binding bindEmailDead(@Qualifier("emailDeadQueue") Queue queue,@Qualifier("deadExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with(rabbitMqMsgProperties.getEmailRouteKey());}/*** 绑定wx死信列到死信交换机* @param queue 死信队列* @return*/@Beanpublic Binding bindWxDead(@Qualifier("wxDeadQueue") Queue queue,@Qualifier("deadExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with(rabbitMqMsgProperties.getWxRouteKey());}/*** 绑定备份列到备份交换机* @param queue 备份队列* @return*/@Beanpublic Binding bindAlternate(@Qualifier("alternateQueue") Queue queue,@Qualifier("alternateExchange") FanoutExchange fanoutExchange){return BindingBuilder.bind(queue).to(fanoutExchange);}/*** 绑定短信队列到TOPIC交换机* @param queue* @param topicExchange* @return*/@Beanpublic Binding bindSms(@Qualifier("smsQueue") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with(rabbitMqMsgProperties.getSmsRouteKey());}/*** 绑定邮件队列* @param queue* @param topicExchange* @return*/@Beanpublic Binding bindEmail(@Qualifier("emailQueue") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with(rabbitMqMsgProperties.getEmailRouteKey());}/*** 绑定微信队列* @param queue* @param topicExchange* @return*/@Beanpublic Binding bindWx(@Qualifier("wxQueue") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with(rabbitMqMsgProperties.getWxRouteKey());}/*** 绑定上⾏消息队列* @param queue* @param topicExchange* @return*/@Beanpublic Binding bindUpstream(@Qualifier("upstreamQueue") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with(rabbitMqMsgProperties.getUpstreamRouteKey());}@Bean@ConditionalOnMissingBean(AliyunAmqpConfig.class)@ConditionalOnBean(RabbitAutoConfiguration.class)@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate(@Qualifier("rabbitConnectionFactory") CachingConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setMandatory(true);template.setConfirmCallback(new RabbitMqMsgConfirmCallback());template.setReturnCallback(new RabbitMqMsgReturnCallback());return template;}@Bean@ConditionalOnBean(AliyunAmqpConfig.class)@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate aliRabbitTemplate(@Qualifier("aliRabbitConnectionFactory") ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setMandatory(true);template.setConfirmCallback(new RabbitMqMsgConfirmCallback());template.setReturnCallback(new RabbitMqMsgReturnCallback());return template;}}controller 类简易代码/*** @desc:短信控制类* @author:* @date: 2020/7/20 14:33* @version: 3.0.0* @since: 3.0.0*/@Api(tags = "消息controller")@RestController@RefreshScope@RequestMapping("/msg/api/sms")public class SmsController {@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate RabbitMqMsgProperties rabbitMqMsgProperties;@ApiOperation(value = "短信发送接⼝")@PostMapping(value = "/send")public Boolean send(@RequestBody @Validated({ValidGroupMsgSend.class}) MsgSendDTO msgSendDTO) throws JsonProcessingException { // msgSendDTO 对象中是要发送给mq中的信息此处省略处理Message message = MessageBuilder.withBody(JsonUtils.toText(msgSendDTO).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).build();// 全局唯⼀CorrelationData correlationData = new CorrelationData(new SnowFlakeIdGenerator().newId() +"");rabbitTemplate.convertAndSend(rabbitMqMsgProperties.getTopicExchangeName(),rabbitMqMsgProperties.getSmsRouteKey(),message,correlationData);return true;}}Consumer接收mq对应队列信息pom包⽂件<!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId></dependency><dependency><groupId>com.alibaba.mq-amqp</groupId><artifactId>mq-amqp-client</artifactId><version>1.0.5</version></dependency>配置⽂件信息# rabbitMq 的相关配置rabbitmq:host: 192.168.118.160port: 5672username: adminpassword: adminlistener:simple:acknowledge-mode: manualconcurrency: 1 # 并发线程default-requeue-rejected: false ⼯具类JsonUtils/*** JSON处理辅助功能*/public final class JsonUtils {/*** MAP对象类型*/private static final MapType MAP_TYPE;/*** MAP对象类型*/private static final CollectionType LIST_TYPE;/*** 默认JSON对象映射器*/private static ObjectMapper defaultMapper;// 静态变量初始化static {MAP_TYPE = TypeFactory.defaultInstance().constructMapType(HashMap.class, String.class, Object.class); LIST_TYPE = TypeFactory.defaultInstance().constructCollectionType(ArrayList.class, MAP_TYPE);defaultMapper = new ObjectMapper();defaultMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);defaultMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);defaultMapper.enable(JsonParser.Feature.ALLOW_MISSING_VALUES);defaultMapper.enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES);defaultMapper.enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES);JavaTimeModule javaTimeModule = new JavaTimeModule();javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer());javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer());defaultMapper.registerModule(javaTimeModule);}/*** 构造⽅法(静态类禁⽌创建)*/private JsonUtils() {}/*** 对象输出JSON⽂本** @param out 输出* @param object 对象*/public static void toOutput(OutputStream out, Object object) {if (object == null) {return;}try {defaultMapper.writeValue(out, object);} catch (IOException e) {throw new RuntimeException(e);}}/*** 对象转换为JSON⽂本** @param object 对象* @return String JSON⽂本*/public static String toText(Object object) {if (object == null) {return null;}try {return defaultMapper.writeValueAsString(object);} catch (JsonProcessingException e) {e.printStackTrace();throw new RuntimeException(e);}}/*** JSON⽂本转换为对象** @param <T> 类型* @param jsonText JSON⽂本* @param cls 类型* @return T 数据对象*/public static <T> T toObject(String jsonText, Class<T> cls) {if (jsonText == null || jsonText.isEmpty()) {return null;}try {return defaultMapper.readValue(jsonText, cls);} catch (IOException e) {throw new RuntimeException(e);}}/*** JSON⽂本转换为对象** @param jsonText JSON⽂本* @return Map*/public static Map<String, Object> toMap(String jsonText) {if (jsonText == null || jsonText.isEmpty()) {return null;}try {return defaultMapper.readValue(jsonText, MAP_TYPE);} catch (IOException e) {throw new RuntimeException(e);}}/*** JSON⽂本转换为对象** @param <T> 类型* @param jsonText JSON⽂本* @param cls 类型* @return Map*/public static <T> Map<String, T> toMap(String jsonText, Class<T> cls) {if (jsonText == null || jsonText.isEmpty()) {return null;}try {return defaultMapper.readValue(jsonText,TypeFactory.defaultInstance().constructMapType(HashMap.class, String.class, cls));} catch (IOException e) {throw new RuntimeException(e);}}/*** JSON⽂本转换为列表** @param jsonText JSON⽂本* @return List<Map>*/public static List<Map<String, Object>> toList(String jsonText) {if (jsonText == null || jsonText.isEmpty()) {return null;}try {return defaultMapper.readValue(jsonText, LIST_TYPE);} catch (IOException e) {throw new RuntimeException(e);}}/*** JSON⽂本转换为列表** @param <T> 类型* @param jsonText JSON⽂本* @param cls 类型* @return List<T> 数据列表*/public static <T> List<T> toList(String jsonText, Class<T> cls) {if (jsonText == null || jsonText.isEmpty()) {return null;}try {return defaultMapper.readValue(jsonText,TypeFactory.defaultInstance().constructCollectionType(ArrayList.class, cls));} catch (IOException e) {throw new RuntimeException(e);}}}Consumer接收类@Slf4j@Componentpublic class MsgSmsConsumer {@Resourceprivate SmsProcessServiceImpl smsProcessServiceImpl;/*** 短信消息处理** @param msgContent 消息内容* @param message 消息*/@RabbitListener(queues = "msg.sms.send")@RabbitHandlerpublic void smsProcess(String msgContent, Message message, Channel channel) throws IOException {// 转换消息MsgSendDTO msgSendDTO = JsonUtils.toObject(msgContent, MsgSendDTO.class);boolean ack = true;BusinessException bizException = null;try {if (!ObjectUtils.isEmpty(msgSendDTO)) {("收到[{}]消息[{}]", msgCategory, JsonUtils.toText(msgSendDTO));boolean sendRet = smsProcessServiceImpl.msgSend(msgSendDTO); //处理之后业务if (!sendRet) {throw new BusinessException(MsgExceptionRetCode.MSG_SEND_FAILED,MsgExceptionRetCode.MSG_SEND_FAILED.getMsg());}}}} catch (BusinessException e) {log.error(e.getMessage());ack = false;bizException = e;}if (!ack) {log.error("[{}]消息消费发⽣异常,错误信息:[{}]", msgCategory, bizException.getMessage(), bizException); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}/*** sms死信消息处理** @param msgContent 消息内容* @param message 消息*/@RabbitListener(queues = "msg.sms.dead.send")@RabbitHandlerpublic void smsDeadProcess(String msgContent, Message message, Channel channel) throws IOException { ("收到[sms]死信消息:[{}]", msgContent);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}详情介绍可参考他⼈链接:。
vue连接rabbitmq原理概述及解释说明1. 引言1.1 概述本篇文章旨在介绍vue连接rabbitmq的原理,并对其进行解释和说明。
在当今互联网时代,消息队列的概念越来越被广泛应用于各种大型分布式系统中,而RabbitMQ作为一个强大的开源消息队列服务提供商,在vue项目中连接RabbitMQ可以实现异步通信、解耦和提高系统可靠性等优势。
本文将从vue 的基本原理和RabbitMQ的概念入手,详细介绍vue如何与RabbitMQ建立连接以及实现消息的发送和接收功能。
1.2 文章结构文章主要分为五个部分:引言、vue连接rabbitmq原理、连接过程详解、原理解释与示例代码演示和结论与展望。
- 引言部分将简要介绍本文内容以及目的。
- Vue连接RabbitMQ原理部分将通过对vue的基本原理和RabbitMQ的简介,阐明了为什么选择使用Vue作为前端框架,并且通过RabbitMQ帮助我们实现消息队列功能。
- 连接过程详解部分将具体说明Vue中使用AMQP.js库连接RabbitMQ所需的步骤,并介绍如何配置相关参数,以及如何实现消息队列的发送和接收功能。
- 原理解释与示例代码演示部分将详细讲解Vue如何通过AMQP.js建立与RabbitMQ的连接通道,以及RabbitMQ消息队列是如何实现消息传递的。
同时,通过示例代码演示Vue连接RabbitMQ并实现消息发布和订阅功能的过程。
- 结论与展望部分将对本文进行总结,并对未来可能的研究方向进行展望。
1.3 目的本文的目标是帮助读者了解Vue连接RabbitMQ的原理,并能够在实际应用中灵活运用该技术。
通过深入剖析Vue和RabbitMQ之间的连接方法以及具体实现步骤,读者将能够更加清晰地掌握该技术并在自己的项目中应用。
最终,读者将能够借助这种技术提高系统性能、可靠性和可扩展性,并为他们今后从事相关工作提供指导和参考。
2. vue连接rabbitmq原理:Vue是一款流行的前端框架,它采用了MVVM(Model-View-ViewModel)的设计模式,并且具有响应式数据绑定和组件化的特点。
orange练习题一、基础概念理解1. 请简述数据挖掘的基本任务。
2. 解释什么是数据仓库及其在数据挖掘中的作用。
3. 描述决策树算法的基本原理。
4. 请说明Kmeans聚类算法的步骤。
5. 解释关联规则挖掘中的支持度、置信度和提升度。
6. 请阐述贝叶斯分类器的原理。
7. 说明遗传算法在数据挖掘中的应用。
8. 描述文本挖掘的主要技术和应用领域。
9. 请简述时间序列分析的基本方法。
10. 解释什么是集成学习及其优势。
二、Python编程基础1. 编写Python代码,实现一个简单的线性回归模型。
2. 使用Python编写代码,实现Kmeans聚类算法。
3. 编写代码,使用决策树算法对鸢尾花数据集进行分类。
4. 使用Python实现Apriori算法进行关联规则挖掘。
5. 编写代码,使用朴素贝叶斯分类器对文本数据进行分类。
6. 使用Python实现一个简单的神经网络模型。
7. 编写代码,使用随机森林算法对数据集进行分类。
8. 使用Python实现Adaboost算法。
9. 编写代码,使用KNN算法对数据集进行分类。
10. 实现一个基于Python的决策树可视化工具。
三、数据预处理1. 编写代码,实现数据标准化处理。
2. 编写代码,实现数据归一化处理。
3. 请描述数据缺失值处理的常见方法。
4. 编写代码,实现数据缺失值的填充。
5. 请简述数据倾斜的解决方法。
6. 编写代码,实现数据去重。
7. 请描述如何处理数据中的异常值。
8. 编写代码,实现数据集的划分(训练集和测试集)。
9. 请简述特征选择的方法。
10. 编写代码,实现特征选择。
四、模型评估与优化1. 请解释交叉验证的原理。
2. 编写代码,实现交叉验证。
3. 请描述混淆矩阵的概念。
4. 编写代码,计算混淆矩阵。
5. 请解释准确率、精确率、召回率和F1值的概念。
6. 编写代码,计算准确率、精确率、召回率和F1值。
7. 请描述过拟合和欠拟合的概念。
8. 编写代码,实现模型的过拟合和欠拟合检测。
详解PythonCelery和RabbitMQ实战教程前⾔是⼀个异步任务队列。
它可以⽤于需要异步运⾏的任何内容。
RabbitMQ是Celery⼴泛使⽤的消息代理。
在本这篇⽂章中,我将使⽤RabbitMQ来介绍Celery的基本概念,然后为⼀个⼩型演⽰项⽬设置Celery 。
最后,设置⼀个Celery Web控制台来监视我的任务基本概念 来!看图说话:BrokerBroker(RabbitMQ)负责创建任务队列,根据⼀些路由规则将任务分派到任务队列,然后将任务从任务队列交付给workerConsumer (Celery Workers)Consumer是执⾏任务的⼀个或多个Celery workers。
可以根据⽤例启动许多workersResult Backend后端⽤于存储任务的结果。
但是,它不是必需的元素,如果不在设置中包含它,就⽆法访问任务的结果安装Celery ⾸先,需要安装好Celery,可以使⽤:pip install celery选择⼀个Broker:RabbitMQ 为什么我们需要broker呢?这是因为Celery本⾝并不构造消息队列,所以它需要⼀个额外的消息传输来完成这项⼯作。
这⾥可以将Celery看作消息代理的包装器实际上,也可以从⼏个不同的代理中进⾏选择,⽐如RabbitMQ、Redis或数据库(例如Django数据库)在这⾥使⽤RabbitMQ作为代理,因为它功能完整、稳定,Celery推荐使⽤它。
由于演⽰我的环境是在Mac OS中,安装RabbitMQ使⽤Homebrew即可:brew install rabbitmq#如果是Ubuntu的话使⽤apt-get安装启动RabbitMQ 程序将在/usr/local/sbin中安装RabbitMQ,虽然有些系统可能会有所不同。
可以将此路径添加到环境变量路径,以便以后⽅便地使⽤。
例如,打开shell启动⽂件~/.bash_profile添加:PATH=$PATH:/usr/local/sbin现在,可以使⽤rabbitmq-server命令启动我们的RabbitMQ服务器。
RabbitMQ 七种队列模式应用场景案例分析(通俗易懂)七种模式介绍与应用场景简单模式(Hello World)做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B应用场景:将发送的电子邮件放到消息队列,然后邮件服务在队列中获取邮件并发送给收件人工作队列模式(Work queues)在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者,一般适用于执行资源密集型任务,单个消费者处理不过来,需要多个消费者进行处理应用场景:一个订单的处理需要10s,有多个订单可以同时放到消息队列,然后让多个消费者同时处理,这样就是并行了,而不是单个消费者的串行情况订阅模式(Publish/Subscribe)一次向许多消费者发送消息,一个生产者发送的消息会被多个消费者获取,也就是将消息将广播到所有的消费者中。
应用场景:更新商品库存后需要通知多个缓存和多个数据库,这里的结构应该是:•一个fanout类型交换机扇出两个个消息队列,分别为缓存消息队列、数据库消息队列•一个缓存消息队列对应着多个缓存消费者•一个数据库消息队列对应着多个数据库消费者路由模式(Routing)有选择地(Routing key)接收消息,发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key,仅消费指定路由key的消息应用场景:如在商品库存中增加了1台iphone12,iphone12促销活动消费者指定routing key为iphone12,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此routing key的消息主题模式(Topics)根据主题(Topics)来接收消息,将路由key和某模式进行匹配,此时队列需要绑定在一个模式上,#匹配一个词或多个词,*只匹配一个词。
应用场景:同上,iphone促销活动可以接收主题为iphone的消息,如iphone12、iphone13等远程过程调用(RPC)如果我们需要在远程计算机上运行功能并等待结果就可以使用RPC,具体流程可以看图。
rabbitmq应用实例RabbitMQ应用实例RabbitMQ是一种开源的消息代理软件,广泛应用于分布式系统中,用于在不同应用程序之间传递消息。
它采用AMQP协议,提供了可靠的消息传递机制,能够确保消息的可靠性和顺序性。
下面我们来看几个RabbitMQ的应用实例。
1. 订单处理系统假设有一个电商网站,用户下单后需要进行订单处理。
在这个过程中,需要将订单信息传递给库存系统、支付系统和物流系统等。
这时就可以利用RabbitMQ来实现不同系统之间的消息传递。
当用户下单时,订单系统将订单信息发送到RabbitMQ,其他系统订阅相应的消息队列,从而实现订单信息的同步处理。
2. 日志收集系统在一个分布式系统中,各个节点会产生大量的日志信息。
为了方便管理和分析这些日志,可以使用RabbitMQ来搭建日志收集系统。
每个节点将日志信息发送到RabbitMQ的消息队列中,然后日志收集服务订阅这些消息队列,将日志信息汇总到中心服务器进行存储和分析。
3. 实时数据处理系统在一些实时数据处理场景中,比如金融交易系统、在线游戏等,需要对数据进行实时处理和分发。
RabbitMQ可以作为数据流处理的中间件,将数据发送到不同的处理节点进行处理。
通过RabbitMQ的消息队列机制,可以实现数据的实时传输和处理,确保系统的高可用性和可靠性。
4. 任务调度系统在一些任务调度场景中,比如定时任务、异步任务等,可以使用RabbitMQ来实现任务的调度和执行。
任务调度系统将任务信息发送到RabbitMQ的消息队列中,工作节点订阅消息队列并执行相应的任务。
通过RabbitMQ的消息确认机制,可以确保任务的可靠执行,避免任务丢失或重复执行的情况。
总结通过以上几个应用实例,我们可以看到RabbitMQ在分布式系统中的重要作用。
它不仅可以实现不同系统之间的消息传递,还可以提高系统的可靠性和可扩展性。
因此,在设计分布式系统时,可以考虑使用RabbitMQ来解决消息传递的问题,提升系统的性能和稳定性。
rabbitmq的心跳机制执行过程案例和代码java由于rabbitmq是一个开源的消息代理软件,它实现了高级消息排队协议(AMQP)的标准,因此它的心跳机制对于保证消息队列的稳定和可靠性至关重要。
本文将详细探讨rabbitmq的心跳机制执行过程,并分析其案例以及使用Java实现的代码。
通过全面的评估和讨论,希望能够让读者对rabbitmq的心跳机制有深入的理解。
1. rabbitmq的心跳机制简介rabbitmq的心跳机制是用来检测连接是否仍然活跃的一种机制。
在网络出现异常或者连接中断的情况下,心跳机制可以检测出并及时处理,保证了消息队列的稳定性。
心跳机制的执行过程涉及到客户端和服务器端之间的相互通信,其实现机制复杂而严谨。
2. rabbitmq心跳机制的执行过程在rabbitmq的心跳机制执行过程中,首先是客户端向服务器端发送心跳包,服务器端收到后会进行响应,之后再由服务器端向客户端发送心跳包,客户端收到后同样进行响应。
这样一来,就形成了一种持续的心跳通信,可以及时发现连接异常,并进行处理。
这一机制保证了消息队列的稳定和可靠性。
3. rabbitmq心跳机制的案例分析以一个实际的案例来分析rabbitmq的心跳机制执行过程。
假设在一个分布式系统中,多个节点之间通过rabbitmq进行消息通信。
当其中一个节点出现网络故障,导致与rabbitmq的连接中断时,心跳机制会发挥作用,及时地发现连接异常并进行处理,从而保证了整个系统的稳定性和可靠性。
4. rabbitmq心跳机制的Java实现在Java中,我们可以使用rabbitmq的客户端库来实现心跳机制。
通过设置相应的参数和监听器,我们可以很方便地实现心跳机制,并在连接异常时进行相应的处理。
下面是一个简单的使用Java实现rabbitmq心跳机制的示例代码。
```java// 设置连接工厂ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost");// 创建连接Connection connection = factory.newConnection();// 设置连接参数connection.aBlockedListener(reason -> {// 处理连接被阻塞的情况System.out.println("Connection blocked: " + reason);});connection.aShutdownListener(reason -> {// 处理连接关闭的情况System.out.println("Connection shutdown: " + reason); });```通过以上示例代码,我们可以看到如何使用Java来实现rabbitmq的心跳机制,并在连接被阻塞或关闭时进行相应的处理。