RabbitMQ的实战应用
- 格式:pptx
- 大小:719.15 KB
- 文档页数:31
python实现RabbitMQ同步跟异步消费模型1,消息推送类1 import pika234# 同步消息推送类5 class RabbitPublisher(object):67# 传⼊RabbitMQ的ip,⽤户名,密码,实例化⼀个管道8 def __init__(self, host, user, password):9 self.host = host10 er = user11 self.password = password12 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, credentials=pika.PlainCredentials(er, self.password)))13 self.channel = self.connection.channel()1415# 发送消息在队列中16 def send(self, queue_name, body):17 self.channel.queue_declare(queue=queue_name, durable=True) # 声明⼀个持久化队列18 self.channel.basic_publish(exchange='',19 routing_key=queue_name, # 队列名字20 body=body, # 消息内容21 properties=pika.BasicProperties(22 delivery_mode=2, # 消息持久化23 ))2425# 清除指定队列的所有的消息26 def purge(self, queue_name):27 self.channel.queue_purge(queue_name)2829# 删除指定队列30 def delete(self, queue_name, if_unused=False, if_empty=False):31 self.channel.queue_delete(queue_name, if_unused=if_unused, if_empty=if_empty)3233# 断开连接34 def stop(self):35 self.connection.close()View Code2.消息消费类(1)同步消息消费在同步消息消费的时候可能会出现pika库断开的情况,原因是因为pika客户端没有及时发送⼼跳,连接就被server端断开了。
基于公有云的RabbitMQ双向数据同步方案一、测试环境阿里云MQ:172.16.130.204(master) / 172.16.128.63(slave)微软云MQ:172.16.192.9(master) / 172.16.192.28(slave)同步组件:RabbitMQ federation-upstream同步方式:双向同步二、方案目标及介绍验证基于不同公有云的数据中心,支持MQ消息队列的业务数据同步方案。
下面配置按先后顺序,分别在阿里云MQ节点和微软云MQ节点进行配置,完成后实现两地消息生产者产生消息会在本地留存一份,并立即转发到对端(各自上游MQ节点),使两地消费者可以及时读取全部消息。
三、阿里云MQ1.创建federation-upstreamURL:amqp://admin:************.192.9(定义上游为微软云香港MQ节点)Reconnect Delay:5sAck mode:on-confirm目的是将上游的微软云香港MQ节点收到的信息,推送到当前MQ节点(下游)。
2.创建exchange3.创建queue4.绑定exchange、queue5.创建policy这条策略是为了在当前阿里云MQ,匹配所有以dual开头的exchange、queue,按federation-upstream定义执行。
策略配置完成即生效,Federation会自动在上游微软云MQ节点完成下列操作:a.创建同名交换器dual.exchangeb.创建federation类型交换器:federation: dual.exchange -> rabbit@iz2zegotcocceakkehlbi3z Bc.创建同名队列dual.queued.创建federation类型队列:federation: dual.exchange -> rabbit@iz2zegotcocceakkehlbi3ze.绑定dual.exchange和federation类型交换器f.绑定federation类型交换器和federation类型队列g.配置federation类型队列的消费者:阿里云MQ节点6.检查连接状态阿里云端MQ的federation中,同步策略运行正常微软云香港MQ的connections中,阿里云MQ服务器已连接成功。
SpringBoot缓存实战Caffeine⽰例Caffeine和Spring Boot集成Caffeine是使⽤Java8对Guava缓存的重写版本,在Spring Boot 2.0中将取代Guava。
如果出现Caffeine,CaffeineCacheManager将会⾃动配置。
使⽤spring.cache.cache-names属性可以在启动时创建缓存,并可以通过以下配置进⾏⾃定义(按顺序):spring.cache.caffeine.spec:定义的特殊缓存com.github.benmanes.caffeine.cache.CaffeineSpec: bean定义com.github.benmanes.caffeine.cache.Caffeine: bean定义例如,以下配置创建⼀个foo和bar缓存,最⼤数量为500,存活时间为10分钟:spring.cache.cache-names=foo,barspring.cache.caffeine.spec=maximumSize=500,expireAfterAccess=600s除此之外,如果定义了com.github.benmanes.caffeine.cache.CacheLoader,它会⾃动关联到CaffeineCacheManager。
由于该CacheLoader将关联被该缓存管理器管理的所有缓存,所以它必须定义为CacheLoader<Object, Object>,⾃动配置将忽略所有泛型类型。
引⼊依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-cache</artifactId></dependency><dependency><groupId>com.github.ben-manes.caffeine</groupId><artifactId>caffeine</artifactId><version>2.6.0</version></dependency>开启缓存的⽀持使⽤@EnableCaching注解让Spring Boot开启对缓存的⽀持@SpringBootApplication@EnableCaching// 开启缓存,需要显⽰的指定public class SpringBootStudentCacheCaffeineApplication {public static void main(String[] args) {SpringApplication.run(SpringBootStudentCacheCaffeineApplication.class, args);}}配置⽂件新增对缓存的特殊配置,如最⼤容量、过期时间等spring.cache.cache-names=peoplespring.cache.caffeine.spec=initialCapacity=50,maximumSize=500,expireAfterWrite=10s,refreshAfterWrite=5s如果使⽤了refreshAfterWrite配置还必须指定⼀个CacheLoader,如:/*** 必须要指定这个Bean,refreshAfterWrite=5s这个配置属性才⽣效** @return*/@Beanpublic CacheLoader<Object, Object> cacheLoader() {CacheLoader<Object, Object> cacheLoader = new CacheLoader<Object, Object>() {@Overridepublic Object load(Object key) throws Exception {return null;}// 重写这个⽅法将oldValue值返回回去,进⽽刷新缓存@Overridepublic Object reload(Object key, Object oldValue) throws Exception {return oldValue;}};return cacheLoader;}Caffeine配置说明:1. initialCapacity=[integer]: 初始的缓存空间⼤⼩2. maximumSize=[long]: 缓存的最⼤条数3. maximumWeight=[long]: 缓存的最⼤权重4. expireAfterAccess=[duration]: 最后⼀次写⼊或访问后经过固定时间过期5. expireAfterWrite=[duration]: 最后⼀次写⼊后经过固定时间过期6. refreshAfterWrite=[duration]: 创建缓存或者最近⼀次更新缓存后经过固定的时间间隔,刷新缓存7. weakKeys: 打开key的弱引⽤8. weakValues:打开value的弱引⽤9. softValues:打开value的软引⽤10. recordStats:开发统计功能注意:1. expireAfterWrite和expireAfterAccess同事存在时,以expireAfterWrite为准。
SpringBoot整合RabbitMQ,简易的队列发送短信实例在这个界⾯⾥⾯我们可以做些什么?可以⼿动创建虚拟host,创建⽤户,分配权限,创建交换机,创建队列等等,还有查看队列消息,消费效率,推送效率等等。
⾸先先介绍⼀个简单的⼀个消息推送到接收的流程,提供⼀个简单的图:黄⾊的圈圈就是我们的消息推送服务,将消息推送到中间⽅框⾥⾯也就是 rabbitMq的服务器,然后经过服务器⾥⾯的交换机、队列等各种关系(后⾯会详细讲)将数据处理⼊列后,最终右边的蓝⾊圈圈消费者获取对应监听的消息。
rabbitMq简单编码 (实例:发送短信)⾸先创建 rabbitmq-provider,pom.xml⾥⽤到的jar依赖:<!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId></dependency><dependency><groupId>com.alibaba.mq-amqp</groupId><artifactId>mq-amqp-client</artifactId><version>1.0.5</version></dependency>application.yml 配置## 配置rabbitMQ 信息rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest# 开启发送确认publisher-confirms: true# 开启发送失败退回publisher-returns: true# 消息 rabbitmq 的⾃定义相关配置rabbit:msg:exchange:fanout:name: msg_fanout_exchangetopic:name: msg_topic_exchangealternate:name: msg_alternate_exchangedead:name: msg_dead_exchangequeue:sms:name: msg.sms.senddead:name: msg.sms.dead.sendupstream:name: msg.sms.upstreamalternate:name: msg.alternateroute:sms: msg.sms.sendupstream: msg.sms.upstream消息 rabbitMq 属性配置配置交换机,并绑定/*** @desc:消息 rabbitMq 属性配置* @author:* @date: 2020/6/24 15:40* @version: 3.0.0* @since: 3.0.0*/@RefreshScope@Componentpublic class RabbitMqMsgProperties {// 扇形交换机名称@Value("${}")private String fanoutExchangeName;// 备份换机名称@Value("${}")private String alternateExchangeName;// TOPIC换机名称@Value("${}")private String topicExchangeName;// 消息死信交换机名称@Value("${}")private String deadExchangeName;// 备份队列名称@Value("${}")private String alternateQueueName;// 短信消息队列名称@Value("${}")private String smsQueueName;// 短信消息死信队列名称@Value("${}")private String smsDeadQueueName;// 邮件消息队列名称@Value("${}")private String emailQueueName;// 邮件消息死信队列名称@Value("${}")private String emailDeadQueueName;// 上⾏消息队列名称@Value("${}")private String upstreamQueueName;// 短信消息路由键@Value("${rabbit.msg.route.sms}")private String smsRouteKey;// 邮件消息路由键@Value("${rabbit.msg.route.email}")private String emailRouteKey;// 上⾏消息路由键@Value("${rabbit.msg.route.upstream}")private String upstreamRouteKey;// 微信消息队列名称@Value("${}")private String wxQueueName;// 微信消息路由键@Value("${rabbit.msg.route.wx}")private String wxRouteKey;// 微信消息死信队列名称@Value("${}")private String wxDeadQueueName;public String getFanoutExchangeName() {return fanoutExchangeName;}public void setFanoutExchangeName(String fanoutExchangeName) { this.fanoutExchangeName = fanoutExchangeName;}public String getSmsQueueName() {return smsQueueName;}public void setSmsQueueName(String smsQueueName) {this.smsQueueName = smsQueueName;}public String getEmailQueueName() {return emailQueueName;}public void setEmailQueueName(String emailQueueName) {this.emailQueueName = emailQueueName;}public String getUpstreamQueueName() {return upstreamQueueName;}public void setUpstreamQueueName(String upstreamQueueName) {this.upstreamQueueName = upstreamQueueName;}public String getTopicExchangeName() {return topicExchangeName;}public void setTopicExchangeName(String topicExchangeName) {this.topicExchangeName = topicExchangeName;}public String getSmsRouteKey() {return smsRouteKey;}public void setSmsRouteKey(String smsRouteKey) {this.smsRouteKey = smsRouteKey;}public String getEmailRouteKey() {return emailRouteKey;}public void setEmailRouteKey(String emailRouteKey) {this.emailRouteKey = emailRouteKey;}public String getUpstreamRouteKey() {return upstreamRouteKey;}public void setUpstreamRouteKey(String upstreamRouteKey) {this.upstreamRouteKey = upstreamRouteKey;}public String getAlternateExchangeName() {return alternateExchangeName;}public void setAlternateExchangeName(String alternateExchangeName) {this.alternateExchangeName = alternateExchangeName;}public String getAlternateQueueName() {return alternateQueueName;}public void setAlternateQueueName(String alternateQueueName) {this.alternateQueueName = alternateQueueName;}public String getSmsDeadQueueName() {return smsDeadQueueName;}public void setSmsDeadQueueName(String smsDeadQueueName) {this.smsDeadQueueName = smsDeadQueueName;}public String getEmailDeadQueueName() {return emailDeadQueueName;}public void setEmailDeadQueueName(String emailDeadQueueName) {this.emailDeadQueueName = emailDeadQueueName;}public String getDeadExchangeName() {return deadExchangeName;}public void setDeadExchangeName(String deadExchangeName) {this.deadExchangeName = deadExchangeName;}public String getWxQueueName() {return wxQueueName;}public void setWxQueueName(String wxQueueName) {this.wxQueueName = wxQueueName;}public String getWxRouteKey() {return wxRouteKey;}public void setWxRouteKey(String wxRouteKey) {this.wxRouteKey = wxRouteKey;}public String getWxDeadQueueName() {return wxDeadQueueName;}public void setWxDeadQueueName(String wxDeadQueueName) {this.wxDeadQueueName = wxDeadQueueName;}}package cn.ygyg.bps.msg.api.conf;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.beans.factory.config.ConfigurableBeanFactory;import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Scope;import javax.annotation.Resource;import java.util.HashMap;import java.util.Map;/*** @desc:消息 rabbitmq 配置类* @author: guanliang.xue* @date: 2020/6/24 15:07* @version: 3.0.0* @since: 3.0.0*/@Configuration@ConditionalOnBean(value = RabbitMqMsgProperties.class)public class RabbitMqMsgConfig {@Resourceprivate RabbitMqMsgProperties rabbitMqMsgProperties;/*** 定义备份交换机* @return 备份交换机*/@Beanpublic FanoutExchange alternateExchange(){return new FanoutExchange(rabbitMqMsgProperties.getAlternateExchangeName());}/*** 定义扇形交换机* @return*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(rabbitMqMsgProperties.getFanoutExchangeName());}/*** 定义TOPIC交换机* @return*/@Beanpublic TopicExchange topicExchange(){Map<String, Object> arguments = new HashMap<>();return new TopicExchange(rabbitMqMsgProperties.getTopicExchangeName(),true,false,arguments); }/*** 死信交换机* @return*/@Beanpublic DirectExchange deadExchange(){return new DirectExchange(rabbitMqMsgProperties.getDeadExchangeName(),true,false);}/*** 备份队列* @return 队列*/@Beanpublic Queue alternateQueue(){return new Queue(rabbitMqMsgProperties.getAlternateQueueName());}/*** 短信队列* @return*/@Beanpublic Queue smsQueue(){Map<String, Object> args = new HashMap<>(2);// x-dead-letter-exchange 这⾥声明当前队列绑定的死信交换机// x-dead-letter-routing-key 死信路由key,默认使⽤原有路由keyargs.put("x-dead-letter-exchange", rabbitMqMsgProperties.getDeadExchangeName());return QueueBuilder.durable(rabbitMqMsgProperties.getSmsQueueName()).withArguments(args).build();}/*** 微信队列* @return*/@Beanpublic Queue wxQueue(){Map<String, Object> args = new HashMap<>(2);// x-dead-letter-exchange 这⾥声明当前队列绑定的死信交换机// x-dead-letter-routing-key 死信路由key,默认使⽤原有路由keyargs.put("x-dead-letter-exchange", rabbitMqMsgProperties.getDeadExchangeName());return QueueBuilder.durable(rabbitMqMsgProperties.getWxQueueName()).withArguments(args).build();}/*** 邮件队列* @return*/@Beanpublic Queue emailQueue(){Map<String, Object> args = new HashMap<>(2);// x-dead-letter-exchange 这⾥声明当前队列绑定的死信交换机// x-dead-letter-routing-key 死信路由key,默认使⽤原有路由keyargs.put("x-dead-letter-exchange", rabbitMqMsgProperties.getDeadExchangeName());return QueueBuilder.durable(rabbitMqMsgProperties.getEmailQueueName()).withArguments(args).build();}/*** sms 死信队列* @return*/@Beanpublic Queue smsDeadQueue(){return new Queue(rabbitMqMsgProperties.getSmsDeadQueueName());}/*** email 死信队列* @return*/@Beanpublic Queue emailDeadQueue(){return new Queue(rabbitMqMsgProperties.getEmailDeadQueueName());}/*** wx 死信队列* @return*/@Beanpublic Queue wxDeadQueue(){return new Queue(rabbitMqMsgProperties.getWxDeadQueueName());}/*** 服务商上⾏队列* @return*/@Beanpublic Queue upstreamQueue(){return new Queue(rabbitMqMsgProperties.getUpstreamQueueName());}/*** 绑定sms死信列到死信交换机* @param queue 死信队列* @return*/@Beanpublic Binding bindSmsDead(@Qualifier("smsDeadQueue") Queue queue,@Qualifier("deadExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with(rabbitMqMsgProperties.getSmsRouteKey());}/*** 绑定email死信列到死信交换机* @param queue 死信队列* @return*/@Beanpublic Binding bindEmailDead(@Qualifier("emailDeadQueue") Queue queue,@Qualifier("deadExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with(rabbitMqMsgProperties.getEmailRouteKey());}/*** 绑定wx死信列到死信交换机* @param queue 死信队列* @return*/@Beanpublic Binding bindWxDead(@Qualifier("wxDeadQueue") Queue queue,@Qualifier("deadExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with(rabbitMqMsgProperties.getWxRouteKey());}/*** 绑定备份列到备份交换机* @param queue 备份队列* @return*/@Beanpublic Binding bindAlternate(@Qualifier("alternateQueue") Queue queue,@Qualifier("alternateExchange") FanoutExchange fanoutExchange){return BindingBuilder.bind(queue).to(fanoutExchange);}/*** 绑定短信队列到TOPIC交换机* @param queue* @param topicExchange* @return*/@Beanpublic Binding bindSms(@Qualifier("smsQueue") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with(rabbitMqMsgProperties.getSmsRouteKey());}/*** 绑定邮件队列* @param queue* @param topicExchange* @return*/@Beanpublic Binding bindEmail(@Qualifier("emailQueue") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with(rabbitMqMsgProperties.getEmailRouteKey());}/*** 绑定微信队列* @param queue* @param topicExchange* @return*/@Beanpublic Binding bindWx(@Qualifier("wxQueue") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with(rabbitMqMsgProperties.getWxRouteKey());}/*** 绑定上⾏消息队列* @param queue* @param topicExchange* @return*/@Beanpublic Binding bindUpstream(@Qualifier("upstreamQueue") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with(rabbitMqMsgProperties.getUpstreamRouteKey());}@Bean@ConditionalOnMissingBean(AliyunAmqpConfig.class)@ConditionalOnBean(RabbitAutoConfiguration.class)@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate(@Qualifier("rabbitConnectionFactory") CachingConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setMandatory(true);template.setConfirmCallback(new RabbitMqMsgConfirmCallback());template.setReturnCallback(new RabbitMqMsgReturnCallback());return template;}@Bean@ConditionalOnBean(AliyunAmqpConfig.class)@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate aliRabbitTemplate(@Qualifier("aliRabbitConnectionFactory") ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setMandatory(true);template.setConfirmCallback(new RabbitMqMsgConfirmCallback());template.setReturnCallback(new RabbitMqMsgReturnCallback());return template;}}controller 类简易代码/*** @desc:短信控制类* @author:* @date: 2020/7/20 14:33* @version: 3.0.0* @since: 3.0.0*/@Api(tags = "消息controller")@RestController@RefreshScope@RequestMapping("/msg/api/sms")public class SmsController {@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate RabbitMqMsgProperties rabbitMqMsgProperties;@ApiOperation(value = "短信发送接⼝")@PostMapping(value = "/send")public Boolean send(@RequestBody @Validated({ValidGroupMsgSend.class}) MsgSendDTO msgSendDTO) throws JsonProcessingException { // msgSendDTO 对象中是要发送给mq中的信息此处省略处理Message message = MessageBuilder.withBody(JsonUtils.toText(msgSendDTO).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).build();// 全局唯⼀CorrelationData correlationData = new CorrelationData(new SnowFlakeIdGenerator().newId() +"");rabbitTemplate.convertAndSend(rabbitMqMsgProperties.getTopicExchangeName(),rabbitMqMsgProperties.getSmsRouteKey(),message,correlationData);return true;}}Consumer接收mq对应队列信息pom包⽂件<!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId></dependency><dependency><groupId>com.alibaba.mq-amqp</groupId><artifactId>mq-amqp-client</artifactId><version>1.0.5</version></dependency>配置⽂件信息# rabbitMq 的相关配置rabbitmq:host: 192.168.118.160port: 5672username: adminpassword: adminlistener:simple:acknowledge-mode: manualconcurrency: 1 # 并发线程default-requeue-rejected: false ⼯具类JsonUtils/*** JSON处理辅助功能*/public final class JsonUtils {/*** MAP对象类型*/private static final MapType MAP_TYPE;/*** MAP对象类型*/private static final CollectionType LIST_TYPE;/*** 默认JSON对象映射器*/private static ObjectMapper defaultMapper;// 静态变量初始化static {MAP_TYPE = TypeFactory.defaultInstance().constructMapType(HashMap.class, String.class, Object.class); LIST_TYPE = TypeFactory.defaultInstance().constructCollectionType(ArrayList.class, MAP_TYPE);defaultMapper = new ObjectMapper();defaultMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);defaultMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);defaultMapper.enable(JsonParser.Feature.ALLOW_MISSING_VALUES);defaultMapper.enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES);defaultMapper.enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES);JavaTimeModule javaTimeModule = new JavaTimeModule();javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer());javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer());defaultMapper.registerModule(javaTimeModule);}/*** 构造⽅法(静态类禁⽌创建)*/private JsonUtils() {}/*** 对象输出JSON⽂本** @param out 输出* @param object 对象*/public static void toOutput(OutputStream out, Object object) {if (object == null) {return;}try {defaultMapper.writeValue(out, object);} catch (IOException e) {throw new RuntimeException(e);}}/*** 对象转换为JSON⽂本** @param object 对象* @return String JSON⽂本*/public static String toText(Object object) {if (object == null) {return null;}try {return defaultMapper.writeValueAsString(object);} catch (JsonProcessingException e) {e.printStackTrace();throw new RuntimeException(e);}}/*** JSON⽂本转换为对象** @param <T> 类型* @param jsonText JSON⽂本* @param cls 类型* @return T 数据对象*/public static <T> T toObject(String jsonText, Class<T> cls) {if (jsonText == null || jsonText.isEmpty()) {return null;}try {return defaultMapper.readValue(jsonText, cls);} catch (IOException e) {throw new RuntimeException(e);}}/*** JSON⽂本转换为对象** @param jsonText JSON⽂本* @return Map*/public static Map<String, Object> toMap(String jsonText) {if (jsonText == null || jsonText.isEmpty()) {return null;}try {return defaultMapper.readValue(jsonText, MAP_TYPE);} catch (IOException e) {throw new RuntimeException(e);}}/*** JSON⽂本转换为对象** @param <T> 类型* @param jsonText JSON⽂本* @param cls 类型* @return Map*/public static <T> Map<String, T> toMap(String jsonText, Class<T> cls) {if (jsonText == null || jsonText.isEmpty()) {return null;}try {return defaultMapper.readValue(jsonText,TypeFactory.defaultInstance().constructMapType(HashMap.class, String.class, cls));} catch (IOException e) {throw new RuntimeException(e);}}/*** JSON⽂本转换为列表** @param jsonText JSON⽂本* @return List<Map>*/public static List<Map<String, Object>> toList(String jsonText) {if (jsonText == null || jsonText.isEmpty()) {return null;}try {return defaultMapper.readValue(jsonText, LIST_TYPE);} catch (IOException e) {throw new RuntimeException(e);}}/*** JSON⽂本转换为列表** @param <T> 类型* @param jsonText JSON⽂本* @param cls 类型* @return List<T> 数据列表*/public static <T> List<T> toList(String jsonText, Class<T> cls) {if (jsonText == null || jsonText.isEmpty()) {return null;}try {return defaultMapper.readValue(jsonText,TypeFactory.defaultInstance().constructCollectionType(ArrayList.class, cls));} catch (IOException e) {throw new RuntimeException(e);}}}Consumer接收类@Slf4j@Componentpublic class MsgSmsConsumer {@Resourceprivate SmsProcessServiceImpl smsProcessServiceImpl;/*** 短信消息处理** @param msgContent 消息内容* @param message 消息*/@RabbitListener(queues = "msg.sms.send")@RabbitHandlerpublic void smsProcess(String msgContent, Message message, Channel channel) throws IOException {// 转换消息MsgSendDTO msgSendDTO = JsonUtils.toObject(msgContent, MsgSendDTO.class);boolean ack = true;BusinessException bizException = null;try {if (!ObjectUtils.isEmpty(msgSendDTO)) {("收到[{}]消息[{}]", msgCategory, JsonUtils.toText(msgSendDTO));boolean sendRet = smsProcessServiceImpl.msgSend(msgSendDTO); //处理之后业务if (!sendRet) {throw new BusinessException(MsgExceptionRetCode.MSG_SEND_FAILED,MsgExceptionRetCode.MSG_SEND_FAILED.getMsg());}}}} catch (BusinessException e) {log.error(e.getMessage());ack = false;bizException = e;}if (!ack) {log.error("[{}]消息消费发⽣异常,错误信息:[{}]", msgCategory, bizException.getMessage(), bizException); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}/*** sms死信消息处理** @param msgContent 消息内容* @param message 消息*/@RabbitListener(queues = "msg.sms.dead.send")@RabbitHandlerpublic void smsDeadProcess(String msgContent, Message message, Channel channel) throws IOException { ("收到[sms]死信消息:[{}]", msgContent);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}详情介绍可参考他⼈链接:。
vue连接rabbitmq原理概述及解释说明1. 引言1.1 概述本篇文章旨在介绍vue连接rabbitmq的原理,并对其进行解释和说明。
在当今互联网时代,消息队列的概念越来越被广泛应用于各种大型分布式系统中,而RabbitMQ作为一个强大的开源消息队列服务提供商,在vue项目中连接RabbitMQ可以实现异步通信、解耦和提高系统可靠性等优势。
本文将从vue 的基本原理和RabbitMQ的概念入手,详细介绍vue如何与RabbitMQ建立连接以及实现消息的发送和接收功能。
1.2 文章结构文章主要分为五个部分:引言、vue连接rabbitmq原理、连接过程详解、原理解释与示例代码演示和结论与展望。
- 引言部分将简要介绍本文内容以及目的。
- Vue连接RabbitMQ原理部分将通过对vue的基本原理和RabbitMQ的简介,阐明了为什么选择使用Vue作为前端框架,并且通过RabbitMQ帮助我们实现消息队列功能。
- 连接过程详解部分将具体说明Vue中使用AMQP.js库连接RabbitMQ所需的步骤,并介绍如何配置相关参数,以及如何实现消息队列的发送和接收功能。
- 原理解释与示例代码演示部分将详细讲解Vue如何通过AMQP.js建立与RabbitMQ的连接通道,以及RabbitMQ消息队列是如何实现消息传递的。
同时,通过示例代码演示Vue连接RabbitMQ并实现消息发布和订阅功能的过程。
- 结论与展望部分将对本文进行总结,并对未来可能的研究方向进行展望。
1.3 目的本文的目标是帮助读者了解Vue连接RabbitMQ的原理,并能够在实际应用中灵活运用该技术。
通过深入剖析Vue和RabbitMQ之间的连接方法以及具体实现步骤,读者将能够更加清晰地掌握该技术并在自己的项目中应用。
最终,读者将能够借助这种技术提高系统性能、可靠性和可扩展性,并为他们今后从事相关工作提供指导和参考。
2. vue连接rabbitmq原理:Vue是一款流行的前端框架,它采用了MVVM(Model-View-ViewModel)的设计模式,并且具有响应式数据绑定和组件化的特点。
orange练习题一、基础概念理解1. 请简述数据挖掘的基本任务。
2. 解释什么是数据仓库及其在数据挖掘中的作用。
3. 描述决策树算法的基本原理。
4. 请说明Kmeans聚类算法的步骤。
5. 解释关联规则挖掘中的支持度、置信度和提升度。
6. 请阐述贝叶斯分类器的原理。
7. 说明遗传算法在数据挖掘中的应用。
8. 描述文本挖掘的主要技术和应用领域。
9. 请简述时间序列分析的基本方法。
10. 解释什么是集成学习及其优势。
二、Python编程基础1. 编写Python代码,实现一个简单的线性回归模型。
2. 使用Python编写代码,实现Kmeans聚类算法。
3. 编写代码,使用决策树算法对鸢尾花数据集进行分类。
4. 使用Python实现Apriori算法进行关联规则挖掘。
5. 编写代码,使用朴素贝叶斯分类器对文本数据进行分类。
6. 使用Python实现一个简单的神经网络模型。
7. 编写代码,使用随机森林算法对数据集进行分类。
8. 使用Python实现Adaboost算法。
9. 编写代码,使用KNN算法对数据集进行分类。
10. 实现一个基于Python的决策树可视化工具。
三、数据预处理1. 编写代码,实现数据标准化处理。
2. 编写代码,实现数据归一化处理。
3. 请描述数据缺失值处理的常见方法。
4. 编写代码,实现数据缺失值的填充。
5. 请简述数据倾斜的解决方法。
6. 编写代码,实现数据去重。
7. 请描述如何处理数据中的异常值。
8. 编写代码,实现数据集的划分(训练集和测试集)。
9. 请简述特征选择的方法。
10. 编写代码,实现特征选择。
四、模型评估与优化1. 请解释交叉验证的原理。
2. 编写代码,实现交叉验证。
3. 请描述混淆矩阵的概念。
4. 编写代码,计算混淆矩阵。
5. 请解释准确率、精确率、召回率和F1值的概念。
6. 编写代码,计算准确率、精确率、召回率和F1值。
7. 请描述过拟合和欠拟合的概念。
8. 编写代码,实现模型的过拟合和欠拟合检测。
详解PythonCelery和RabbitMQ实战教程前⾔是⼀个异步任务队列。
它可以⽤于需要异步运⾏的任何内容。
RabbitMQ是Celery⼴泛使⽤的消息代理。
在本这篇⽂章中,我将使⽤RabbitMQ来介绍Celery的基本概念,然后为⼀个⼩型演⽰项⽬设置Celery 。
最后,设置⼀个Celery Web控制台来监视我的任务基本概念 来!看图说话:BrokerBroker(RabbitMQ)负责创建任务队列,根据⼀些路由规则将任务分派到任务队列,然后将任务从任务队列交付给workerConsumer (Celery Workers)Consumer是执⾏任务的⼀个或多个Celery workers。
可以根据⽤例启动许多workersResult Backend后端⽤于存储任务的结果。
但是,它不是必需的元素,如果不在设置中包含它,就⽆法访问任务的结果安装Celery ⾸先,需要安装好Celery,可以使⽤:pip install celery选择⼀个Broker:RabbitMQ 为什么我们需要broker呢?这是因为Celery本⾝并不构造消息队列,所以它需要⼀个额外的消息传输来完成这项⼯作。
这⾥可以将Celery看作消息代理的包装器实际上,也可以从⼏个不同的代理中进⾏选择,⽐如RabbitMQ、Redis或数据库(例如Django数据库)在这⾥使⽤RabbitMQ作为代理,因为它功能完整、稳定,Celery推荐使⽤它。
由于演⽰我的环境是在Mac OS中,安装RabbitMQ使⽤Homebrew即可:brew install rabbitmq#如果是Ubuntu的话使⽤apt-get安装启动RabbitMQ 程序将在/usr/local/sbin中安装RabbitMQ,虽然有些系统可能会有所不同。
可以将此路径添加到环境变量路径,以便以后⽅便地使⽤。
例如,打开shell启动⽂件~/.bash_profile添加:PATH=$PATH:/usr/local/sbin现在,可以使⽤rabbitmq-server命令启动我们的RabbitMQ服务器。
RabbitMQ 七种队列模式应用场景案例分析(通俗易懂)七种模式介绍与应用场景简单模式(Hello World)做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B应用场景:将发送的电子邮件放到消息队列,然后邮件服务在队列中获取邮件并发送给收件人工作队列模式(Work queues)在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者,一般适用于执行资源密集型任务,单个消费者处理不过来,需要多个消费者进行处理应用场景:一个订单的处理需要10s,有多个订单可以同时放到消息队列,然后让多个消费者同时处理,这样就是并行了,而不是单个消费者的串行情况订阅模式(Publish/Subscribe)一次向许多消费者发送消息,一个生产者发送的消息会被多个消费者获取,也就是将消息将广播到所有的消费者中。
应用场景:更新商品库存后需要通知多个缓存和多个数据库,这里的结构应该是:•一个fanout类型交换机扇出两个个消息队列,分别为缓存消息队列、数据库消息队列•一个缓存消息队列对应着多个缓存消费者•一个数据库消息队列对应着多个数据库消费者路由模式(Routing)有选择地(Routing key)接收消息,发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key,仅消费指定路由key的消息应用场景:如在商品库存中增加了1台iphone12,iphone12促销活动消费者指定routing key为iphone12,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此routing key的消息主题模式(Topics)根据主题(Topics)来接收消息,将路由key和某模式进行匹配,此时队列需要绑定在一个模式上,#匹配一个词或多个词,*只匹配一个词。
应用场景:同上,iphone促销活动可以接收主题为iphone的消息,如iphone12、iphone13等远程过程调用(RPC)如果我们需要在远程计算机上运行功能并等待结果就可以使用RPC,具体流程可以看图。
rabbitmq应用实例RabbitMQ应用实例RabbitMQ是一种开源的消息代理软件,广泛应用于分布式系统中,用于在不同应用程序之间传递消息。
它采用AMQP协议,提供了可靠的消息传递机制,能够确保消息的可靠性和顺序性。
下面我们来看几个RabbitMQ的应用实例。
1. 订单处理系统假设有一个电商网站,用户下单后需要进行订单处理。
在这个过程中,需要将订单信息传递给库存系统、支付系统和物流系统等。
这时就可以利用RabbitMQ来实现不同系统之间的消息传递。
当用户下单时,订单系统将订单信息发送到RabbitMQ,其他系统订阅相应的消息队列,从而实现订单信息的同步处理。
2. 日志收集系统在一个分布式系统中,各个节点会产生大量的日志信息。
为了方便管理和分析这些日志,可以使用RabbitMQ来搭建日志收集系统。
每个节点将日志信息发送到RabbitMQ的消息队列中,然后日志收集服务订阅这些消息队列,将日志信息汇总到中心服务器进行存储和分析。
3. 实时数据处理系统在一些实时数据处理场景中,比如金融交易系统、在线游戏等,需要对数据进行实时处理和分发。
RabbitMQ可以作为数据流处理的中间件,将数据发送到不同的处理节点进行处理。
通过RabbitMQ的消息队列机制,可以实现数据的实时传输和处理,确保系统的高可用性和可靠性。
4. 任务调度系统在一些任务调度场景中,比如定时任务、异步任务等,可以使用RabbitMQ来实现任务的调度和执行。
任务调度系统将任务信息发送到RabbitMQ的消息队列中,工作节点订阅消息队列并执行相应的任务。
通过RabbitMQ的消息确认机制,可以确保任务的可靠执行,避免任务丢失或重复执行的情况。
总结通过以上几个应用实例,我们可以看到RabbitMQ在分布式系统中的重要作用。
它不仅可以实现不同系统之间的消息传递,还可以提高系统的可靠性和可扩展性。
因此,在设计分布式系统时,可以考虑使用RabbitMQ来解决消息传递的问题,提升系统的性能和稳定性。
rabbitmq的心跳机制执行过程案例和代码java由于rabbitmq是一个开源的消息代理软件,它实现了高级消息排队协议(AMQP)的标准,因此它的心跳机制对于保证消息队列的稳定和可靠性至关重要。
本文将详细探讨rabbitmq的心跳机制执行过程,并分析其案例以及使用Java实现的代码。
通过全面的评估和讨论,希望能够让读者对rabbitmq的心跳机制有深入的理解。
1. rabbitmq的心跳机制简介rabbitmq的心跳机制是用来检测连接是否仍然活跃的一种机制。
在网络出现异常或者连接中断的情况下,心跳机制可以检测出并及时处理,保证了消息队列的稳定性。
心跳机制的执行过程涉及到客户端和服务器端之间的相互通信,其实现机制复杂而严谨。
2. rabbitmq心跳机制的执行过程在rabbitmq的心跳机制执行过程中,首先是客户端向服务器端发送心跳包,服务器端收到后会进行响应,之后再由服务器端向客户端发送心跳包,客户端收到后同样进行响应。
这样一来,就形成了一种持续的心跳通信,可以及时发现连接异常,并进行处理。
这一机制保证了消息队列的稳定和可靠性。
3. rabbitmq心跳机制的案例分析以一个实际的案例来分析rabbitmq的心跳机制执行过程。
假设在一个分布式系统中,多个节点之间通过rabbitmq进行消息通信。
当其中一个节点出现网络故障,导致与rabbitmq的连接中断时,心跳机制会发挥作用,及时地发现连接异常并进行处理,从而保证了整个系统的稳定性和可靠性。
4. rabbitmq心跳机制的Java实现在Java中,我们可以使用rabbitmq的客户端库来实现心跳机制。
通过设置相应的参数和监听器,我们可以很方便地实现心跳机制,并在连接异常时进行相应的处理。
下面是一个简单的使用Java实现rabbitmq心跳机制的示例代码。
```java// 设置连接工厂ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost");// 创建连接Connection connection = factory.newConnection();// 设置连接参数connection.aBlockedListener(reason -> {// 处理连接被阻塞的情况System.out.println("Connection blocked: " + reason);});connection.aShutdownListener(reason -> {// 处理连接关闭的情况System.out.println("Connection shutdown: " + reason); });```通过以上示例代码,我们可以看到如何使用Java来实现rabbitmq的心跳机制,并在连接被阻塞或关闭时进行相应的处理。
RabbitMQ实战管理界⾯⼀、项⽬⽰例1、appsettings.json"Service": { "RabbitMQ": {"Enabled": true,"Conn": "host=47.94.211.196;virtualHost=/;username=admin;password=admin;timeout=60"}}2、读取配置⽂件这⾥只是简单的说明下,部分代码:///<summary>/// RabbitMQ 启⽤///</summary>public static bool? RabbitMQ_Enabled = Appsettings.GetApp(new string[] { "Service", "RabbitMQ", "Enabled" })?.ToBool();///<summary>/// RabbitMQ 虚拟主机///</summary>public static string RabbitMQ_Conn = Appsettings.GetApp(new string[] { "Service", "RabbitMQ", "Conn" });3、EasyNetQ服务这⾥使⽤的是EasyNetQ中间件,源码已经在github中公开,主要提供了对RabbitMQ操作的API。
项⽬引⼊EasyNetQ:4、Startup中添加RabbitMQ服务扩展⽅法://rabbitmq消息队列services.AddRabbitMQService();具体的扩展⽅法:namespace CommonCore.Helpers{static partial class Extention{public static IServiceCollection AddRabbitMQService(this IServiceCollection services){if (services == null) throw new ArgumentNullException(nameof(services));if ((bool)AppsettingsMap.RabbitMQ_Enabled){services.AddSingleton(RabbitHutch.CreateBus(AppsettingsMap.RabbitMQ_Conn));}return services;}}}5、注⼊using EasyNetQ;using EasyNetQ.Topology;public class BaseProvider{ public BaseProvider(IBus bus, ILogger logger) { _logger = logger; _bus = bus; } private readonly IBus _bus; protected ILogger _logger;}IExchange exchange = await _bus.Advanced.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct); await _bus.Advanced.PublishAsync(exchange, server, false, new MessageProperties() { DeliveryMode = 2 }, SerializeHelper.SerializeToBytes(new MessageOptions{}));⼆、RabbitMQ管理界⾯打开RabbitMQ后台界⾯,如上图。
航空行业飞行训练与维护管理系统方案第1章引言 (3)1.1 背景与意义 (3)1.2 系统目标与范围 (4)第2章行业现状与需求分析 (4)2.1 航空行业现状 (4)2.2 飞行训练需求分析 (4)2.3 维护管理需求分析 (5)第3章系统设计原则与框架 (5)3.1 设计原则 (5)3.1.1 实用性原则 (5)3.1.2 可靠性原则 (5)3.1.3 安全性原则 (5)3.1.4 可扩展性原则 (5)3.1.5 易维护性原则 (6)3.1.6 高效性原则 (6)3.2 系统框架 (6)3.2.1 总体框架 (6)3.2.2 技术框架 (6)3.2.3 网络框架 (7)第4章飞行训练模块设计 (7)4.1 飞行员信息管理 (7)4.1.1 飞行员基本信息管理 (7)4.1.2 资质认证管理 (7)4.1.3 飞行经历管理 (7)4.1.4 考核记录管理 (7)4.2 训练计划与课程安排 (7)4.2.1 训练计划管理 (7)4.2.2 课程安排管理 (8)4.2.3 进度跟踪与评估 (8)4.3 教练与教学资源管理 (8)4.3.1 教练员信息管理 (8)4.3.2 教学资源管理 (8)4.3.3 教练员排班管理 (8)第5章维护管理模块设计 (8)5.1 飞机基本信息管理 (8)5.1.1 功能需求 (8)5.1.2 系统设计 (9)5.2 维护计划与工单管理 (9)5.2.1 功能需求 (9)5.2.2 系统设计 (9)5.3 部件库存与供应链管理 (9)5.3.2 系统设计 (9)第6章在线训练与评估 (10)6.1 在线理论学习 (10)6.1.1 课程设置 (10)6.1.2 教学方式 (10)6.1.3 教学管理 (10)6.2 模拟飞行训练 (10)6.2.1 模拟飞行设备 (10)6.2.2 训练内容 (10)6.2.3 训练管理 (10)6.3 飞行技能评估 (11)6.3.1 评估内容 (11)6.3.2 评估方法 (11)6.3.3 评估标准 (11)6.3.4 评估管理 (11)第7章数据分析与决策支持 (11)7.1 数据收集与处理 (11)7.1.1 数据收集 (11)7.1.2 数据处理 (12)7.2 飞行训练数据分析 (12)7.2.1 飞行员操作技能分析 (12)7.2.2 训练课程效果分析 (12)7.2.3 飞行员心理素质分析 (12)7.3 维护管理数据分析 (12)7.3.1 飞机维护质量分析 (12)7.3.2 维修成本分析 (12)7.3.3 维修人员绩效分析 (12)第8章系统集成与接口设计 (13)8.1 系统集成方案 (13)8.1.1 系统架构 (13)8.1.2 集成方式 (13)8.1.3 集成技术 (13)8.2 数据接口设计 (13)8.2.1 接口概述 (13)8.2.2 接口规范 (13)8.2.3 接口实现 (13)8.3 硬件设备接口设计 (14)8.3.1 接口概述 (14)8.3.2 接口规范 (14)8.3.3 接口实现 (14)第9章信息安全与系统运维 (14)9.1 信息安全策略 (14)9.1.1 权限管理 (14)9.1.3 防火墙与入侵检测 (15)9.1.4 安全审计 (15)9.2 数据备份与恢复 (15)9.2.1 备份策略 (15)9.2.2 备份类型 (15)9.2.3 恢复策略 (15)9.3 系统运维管理 (15)9.3.1 系统监控 (15)9.3.2 系统升级与维护 (15)9.3.3 运维团队建设 (15)9.3.4 运维管理制度 (15)第10章项目实施与评估 (16)10.1 项目实施计划 (16)10.1.1 项目目标 (16)10.1.2 项目范围 (16)10.1.3 项目组织结构 (16)10.1.4 项目时间表 (16)10.1.5 资源配置 (16)10.1.6 质量控制 (16)10.2 风险评估与应对措施 (16)10.2.1 技术风险 (16)10.2.2 人员风险 (16)10.2.3 资金风险 (16)10.2.4 合同风险 (16)10.2.5 政策风险 (17)10.3 项目评估与优化建议 (17)10.3.1 项目进度评估 (17)10.3.2 项目质量评估 (17)10.3.3 项目成本评估 (17)10.3.4 项目效益评估 (17)10.3.5 项目持续改进 (17)第1章引言1.1 背景与意义航空行业的快速发展,航空公司对飞行安全和效率的要求日益提高。
C#队列学习笔记:RabbitMQ使⽤多线程提⾼消费吞吐率⼀、引⾔使⽤⼯作队列的⼀个好处就是它能够并⾏的处理队列。
如果堆积了很多任务,我们只需要添加更多的⼯作者(workers)就可以了,扩展很简单。
本例使⽤多线程来创建多信道并绑定队列,达到多workers的⽬的。
⼆、⽰例2.1、环境准备在NuGet上安装RabbitMQ.Client。
2.2、⼯⼚类添加⼀个⼯⼚类RabbitMQFactory:///<summary>///多路复⽤技术(Multiplexing)⽬的:为了避免创建多个TCP⽽造成系统资源的浪费和超载,从⽽有效地利⽤TCP连接。
///</summary>public static class RabbitMQFactory{private static IConnection sharedConnection;private static int ChannelCount { get; set; }private static readonly object _locker = new object();public static IConnection SharedConnection{get{if (ChannelCount >= 1000){if (sharedConnection != null && sharedConnection.IsOpen){sharedConnection.Close();}sharedConnection = null;ChannelCount = 0;}if (sharedConnection == null){lock (_locker){if (sharedConnection == null){sharedConnection = GetConnection();ChannelCount++;}}}return sharedConnection;}}private static IConnection GetConnection(){var factory = new ConnectionFactory{HostName = "192.168.2.242",UserName = "hello",Password = "world",Port = eDefaultPort,//5672VirtualHost = ConnectionFactory.DefaultVHost,//使⽤默认值:"/"Protocol = Protocols.DefaultProtocol,AutomaticRecoveryEnabled = true};return factory.CreateConnection();}}View Code2.3、主窗体代码如下:public partial class RabbitMQMultithreading : Form{public delegate void ListViewDelegate<T>(T obj);public RabbitMQMultithreading(){InitializeComponent();}///<summary>/// ShowMessage重载///</summary>///<param name="msg"></param>private void ShowMessage(string msg){if (InvokeRequired){BeginInvoke(new ListViewDelegate<string>(ShowMessage), msg);}else{ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), msg });lvwMsg.Items.Insert(0, item);}}///<summary>/// ShowMessage重载///</summary>///<param name="format"></param>///<param name="args"></param>private void ShowMessage(string format, params object[] args){if (InvokeRequired){BeginInvoke(new MethodInvoker(delegate (){ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), string.Format(format, args) }); lvwMsg.Items.Insert(0, item);}));}else{ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), string.Format(format, args) }); lvwMsg.Items.Insert(0, item);}}///<summary>///⽣产者///</summary>///<param name="sender"></param>///<param name="e"></param>private void btnSend_Click(object sender, EventArgs e){int messageCount = 100;var factory = new ConnectionFactory{HostName = "192.168.2.242",UserName = "hello",Password = "world",Port = eDefaultPort,//5672VirtualHost = ConnectionFactory.DefaultVHost,//使⽤默认值:"/"Protocol = Protocols.DefaultProtocol,AutomaticRecoveryEnabled = true};using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);string message = "Hello World";var body = Encoding.UTF8.GetBytes(message);for (int i = 1; i <= messageCount; i++){channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);ShowMessage($"Send {message}");}}}}///<summary>///消费者///</summary>///<param name="sender"></param>///<param name="e"></param>private async void btnReceive_Click(object sender, EventArgs e){Random random = new Random();int rallyNumber = random.Next(1, 1000);int channelCount = 0;await Task.Run(() =>{try{int asyncCount = 10;List<Task<bool>> tasks = new List<Task<bool>>();var connection = RabbitMQFactory.SharedConnection;for (int i = 1; i <= asyncCount; i++){tasks.Add(Task.Factory.StartNew(() => MessageWorkItemCallback(connection, rallyNumber)));}Task.WaitAll(tasks.ToArray());string syncResultMsg = $"集结号 {rallyNumber} 已吹起号⾓--" +$"本次开启信道成功数:{tasks.Count(s => s.Result == true)}," +$"本次开启信道失败数:{tasks.Count() - tasks.Count(s => s.Result == true)}" +$"累计开启信道成功数:{channelCount + tasks.Count(s => s.Result == true)}";ShowMessage(syncResultMsg);}catch (Exception ex){ShowMessage($"集结号 {rallyNumber} 消费异常:{ex.Message}");}});}///<summary>///异步⽅法///</summary>///<param name="state"></param>///<param name="rallyNumber"></param>///<returns></returns>private bool MessageWorkItemCallback(object state, int rallyNumber){bool syncResult = false;IModel channel = null;try{IConnection connection = state as IConnection;//不能使⽤using (channel = connection.CreateModel())来创建信道,让RabbitMQ⾃动回收channel。
消息队列的参考文献
以下是关于消息队列的常见参考文献,供您参考:
1. 《RabbitMQ实战指南》(作者:阿卡索)
2. 《深入理解Kafka:核心设计与实践原理》(作者:周立)
3. 《ZeroMQ指南》(作者:Pieter Hintjens)
4. 《ActiveMQ入门指南》(作者:菲利普·科斯图希奥)
5. 《RocketMQ实战与原理解析》(作者:林子雨)
6. 《Pulsar实战指南》(作者:朱思民)
7. 《NSQ: 分布式实时消息平台》(作者:Yang Song)
8. 《消息队列:RabbitMQ、ZeroMQ、Kafka和ActiveMQ比较分析》(作者:鲍明勇)
9. 《架构探险:从零开始写分布式消息队列》(作者:李炎恢)
10. 《Apache Kafka权威指南》(作者:Neha Narkhede, Gwen Shapira, Todd Palino)
以上文献涵盖了消息队列的基本概念、原理和实战经验,可供进一步学习和参考。
rabbitmq刷盘机制RabbitMQ是一种开源的消息中间件,它使用AMQP(Advanced Message Queuing Protocol)作为消息传输协议,提供了可靠的消息传递机制。
在RabbitMQ中,刷盘(Flush)机制是一种保证数据持久化的重要手段。
本文将介绍RabbitMQ的刷盘机制,包括刷盘的原理、作用和实现方式。
一、刷盘机制的原理刷盘机制是指将内存中的数据写入磁盘的过程。
在RabbitMQ中,刷盘机制主要用于保证消息的持久化,即将消息写入磁盘后即使RabbitMQ服务器发生故障,消息也能够得到保留,不会丢失。
刷盘机制的原理是将内存中的数据写入到磁盘的持久化文件中。
当消息到达RabbitMQ服务器时,首先将消息存储在内存中的缓冲区中,然后通过刷盘机制将消息写入到磁盘中。
这样一来,即使发生服务器宕机或重启,消息仍然可以从磁盘中读取,确保消息的可靠性。
二、刷盘机制的作用刷盘机制在RabbitMQ中起到了保证消息持久化的作用。
通过将消息写入磁盘,可以确保消息不会因为服务器故障而丢失。
这对于一些重要的业务场景非常关键,比如金融、电商等行业,这些行业对消息的可靠性要求非常高。
刷盘机制还可以提高RabbitMQ服务器的性能。
因为将消息存储在内存中是比较快速的,而将消息写入磁盘则比较耗时。
通过刷盘机制,可以将一部分内存中的数据写入磁盘,释放内存空间,提高服务器的处理能力。
三、刷盘机制的实现方式在RabbitMQ中,刷盘机制有两种实现方式:同步刷盘和异步刷盘。
1. 同步刷盘同步刷盘是指每次写入磁盘时,都要等待写入操作完成后才能继续执行后续操作。
这种方式可以保证数据的可靠性,但对性能有一定的影响,因为每次都要等待写入操作完成。
2. 异步刷盘异步刷盘是指将写入磁盘的操作交给操作系统来处理,不需要等待写入操作完成。
这种方式可以提高性能,因为不需要等待写入操作完成,可以继续执行后续操作。
但是异步刷盘也存在一定的风险,如果在写入磁盘之前发生服务器故障,可能会导致数据丢失。