dubbo源码解析2.0
- 格式:pdf
- 大小:2.56 MB
- 文档页数:97
(万字好⽂)Dubbo服务熔断与降级的深⼊讲解代码实战原⽂链接:⼀、Dubbo服务降级实战1 mock 机制谈到服务降级,Dubbo 本⾝就提供了服务降级的机制;⽽ Dubbo 的服务降级机制主要是利⽤服务消费者的 mock 属性。
服务消费者的 mock 属性有以下三种使⽤⽅式,下⾯将带着例⼦简单介绍⼀下。
1.1 服务消费者注册url的mock属性例⼦:mock=return+null,即当服务提供者出现异常(宕机或者业务异常),则返回null给服务消费者。
2021-01-26 09:39:54.631 [main] [INFO ] [o.a.d.r.z.ZookeeperRegistry] [] [] - [DUBBO] Notify urls for subscribe url consumer://127.0.0.1/com.winfun.service.DubboServiceOne?application=dubbo-service&category=providers,configurators,routers& 1.2 @DubboReference注解或者标签的mock属性例⼦:mock="return null",即当服务提供者出现异常(宕机或者业务异常),则返回null给服务消费者。
public class HelloController{@DubboReference(check = false,lazy = true,retries = 1,mock = "return null")private DubboServiceOne dubboServiceOne;//.....}1.3 服务消费者mock属性设置为true+Mock实现类例⼦:Mock实现类为 Dubbo接⼝的实现类,并且 Mock实现类与 Dubbo接⼝放同⼀个路径下(可不同项⽬,但是保证包路径是⼀致的)。
Dubbo原理解析-注册中心之Zookeeper协议注册中心下面我们来看下开源dubbo推荐的业界成熟的zookeeper做为注册中心,zookeeper是hadoop的一个子项目是分布式系统的可靠协调者,他提供了配置维护,名字服务,分布式同步等服务。
对于zookeeper的原理本文档不分析,后面有时间在做专题。
zookeeper注册中心Zookeeper对数据存储类似linux的目录结构,下面给出官方文档对dubbo注册数据的存储示例假设读者对zookeeper有所了解,能够搭建zookeeper服务,其实不了解也没关系,谷歌百度下分分钟搞起。
作为测试调试dubbo,我是在本地起的zookeeper指定zookeeper配置文件地址配置文件中两个关键参数:dataDir zookeeper存储文件的地址clientPort 客户端链接的端口号Dubbo服务提供者配置<dubbo:registry protocol=”zookeeper” address="127.0.0. 1:2181" /><beanid="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/><dubbo:serviceinterface="com .alibaba.dubbo.demo.DemoServi ce"ref="demoService"/>除了配置注册中心的,其他都一样Dubbo服务消费者配置<dubbo:registryprotocol=”zookeeper” address="127.0.0. 1:2181"/><dubbo:referenceid="demoService"interface="com.a libaba.dubbo.demo.DemoService"/>除了配置注册中心的,其他都一样客户端获取注册器服务的提供者和消费者在RegistryProtocol利用注册中心暴露(export)和引用(refer)服务的时候会根据配置利用Dubbo的SPI机制获取具体注册中心注册器Registry registry =registryFactory.getRegistry(url);这里的RegistryFactory是ZookeeperRegistryFactory看如下工厂代码public class ZookeeperRegistryFactory extends AbstractRegistryFactory { public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); }}这里创建zookeepr注册器ZookeeperRegistryZookeeperTransporter是操作zookeepr 的客户端的工厂类,用来创建zookeeper客户端,这里客户端并不是zookeeper源代码的自带的,而是采用第三方工具包,主要来简化对zookeeper的操作,例如用zookeeper做注册中心需要对zookeeper节点添加watcher 做反向推送,但是每次回调后节点的watcher都会被删除,这些客户会自动维护了这些watcher,在自动添加到节点上去。
Dubbo源码解析经过上一篇dubbo源码解析-简单原理、与spring融合的铺垫,我们已经能简单的实现了dubbo的服务引用.其实上一篇中的代码,很多都是从dubbo源码中复制出来,甚至有些类名,变量名都没改.那请问,我为什么要这么做?我认为学习一个框架,无非就三个步骤.•掌握基本使用•看过源码,知道其中原理•临摹源码,自己仿写一个简易的框架其实大家都清楚,编程这东西,最关键是多动手.也就是,第三步才是最关键的.但是现实也是非常残酷的,绝大多数人都停留在第一步.光是第二步,都有些让人产生的心里恐惧.所以在写服务引用的时候,我就想到了小时候看纪晓岚的一个片段.当时红楼梦是禁书,纪晓岚为了让太后看红楼梦,就把红楼梦这个名字换成了石头记.这样太后自然就没有心里负担.我觉得用一个图来描述可能更贴切当然临摹源码的这个过程,依肥朝拙见,也需要分为三个过程,分别是入门版(用最简单的代码表达出框架原理)、进阶版(加入设计模式等思想,在入门版的基础上优化代码)、高级版(和框架代码基本一致).当然上一篇的入门版只是抛砖引玉,等整个dubbo源码解析系列完结之后,和大家一起临摹dubbo源码也在计划当中.当然更多后续进展关注肥朝即可.•描述一下dubbo服务引用的过程,原理•既然你提到了dubbo的服务引用中封装通信细节是用到了动态代理,那请问创建动态代理常用的方式有哪些,他们又有什么区别?dubbo中用的是哪一种?(高频题)•除了JDK动态代理和CGLIB动态代理外,还知不知道其他实现代理的方式?(区分度高)看源码对于大多数人来说,最难的一点莫过于"从源码的哪个地方开始看".虽然我之前数十篇dubbo源码解析都在回答这个问题,但是每发出一篇,都还是有小伙伴私信问我同样的问题.对此,我当然是选择"原谅他".因此,本篇我又再次粗暴式的点题,"怎么看源码".就把本篇来说,这个服务引用的原理,我们要从哪里开始看呢?我们一起看一下官方文档如果你在上一篇中把我贴出来的demo都实现过一遍,再看到这个图,就不难总结出服务引用无非就是做了两件事•将spring的schemas标签信息转换bean,然后通过这个bean 的信息,连接、订阅zookeeper节点信息创建一个invoker •将invoker的信息创建一个动态代理对象温馨提示:除了看官方文档入手,在dubbo源码解析-服务暴露原理中我还提到了从输出日志入手.当然,我这里列举了两种方式只是给你提供参考,并不是说一共就只有这两种方式,也不是说,这两种就是最优的.直入主题有部分朋友反馈说代码贴图手机阅读不友好,但是如果不贴图的话,很多朋友看完文章自己debug的时候找相应的类和方法又要花费大量时间,所以折中一下,贴图和贴代码结合public Invoker refer(Class type, URL url) throws RpcException {url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.RE GISTRY_KEY);//序号2,这里的逻辑和之前分享的'zookeeper连接'基本一致,不熟悉的可以回去看看Registry registry = registryFactory.getRegistry(url);if (RegistryService.class.equals(type)) {return proxyFactory.getInvoker((T) registry, type, url);}// group="a,b" or group="*"Map qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Con stants.REFER_KEY));String group = qs.get(Constants.GROUP_KEY);if (group != null && group.length() > 0 ) {if( ( MA_SPLIT_PATTERN.split( group ) ).length > 1 || "*".equals( group ) ) {return doRefer( getMergeableCluster(), registry, type, url );}}return doRefer(cluster, registry, type, url);}private Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url) {RegistryDirectory directory = new RegistryDirectory(type, url);directory.setRegistry(registry);directory.setProtocol(protocol);URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());if (! Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) {registry.register(subscribeUrl.addParameters(Constants.CAT EGORY_KEY, Constants.CONSUMERS_CATEGORY,Constants.CHECK_KEY, String.valueOf(false)));}//序号3,这里的逻辑和之前分享的'zookeeper订阅'基本一致,不熟悉的可以回去看看directory.subscribe(subscribeUrl.addParameter(Constants.C ATEGORY_KEY,Constants.PROVIDERS_CATEGORY+ "," + Constants.CONFIGURATORS_CATEGORY+ "," + Constants.ROUTERS_CATEGORY));//序号4,cluster关键字在集群容错系列也提到过,不熟悉的可以回去看看return cluster.join(directory);}上面的这4步,就完成了schemas标签信息到invoker的转换,那么下面就是创建代理对象了(序号5)private T createProxy(Map map){//......(省略部分代码)// 创建服务代理return (T) proxyFactory.getProxy(invoker);}我们知道,要封装这个通信细节,让用户像以本地调用方式调用远程服务,就必须使用代理,然后说到动态代理,一般我们就想到两种,一种是JDK的动态代理,一种是CGLIB的动态代理,那我们看看两者有什么特点.JDK的动态代理代理的对象必须要实现一个接口,而针对于没有接口的类,则可用CGLIB.要明白两者区别必须要了解原理,之前反复强调,明白了原理自然一通百通.CGLIB其原理也很简单,对指定的目标类生成一个子类,并覆盖其中方法实现增强,但由于采用的是继承,所以不能对final修饰的类进行代理.除了以上两种大家都很熟悉的方式外,其实还有一种方式,就是javassist生成字节码来实现代理(后面会详细讲,dubbo多处用到了javassist).那dubbo究竟用到了哪种方式实现代理呢?我们往下看序号5的结束本篇也接近了尾声.本篇综合性较强,其中涉及到之前的内容本篇将不再重复提及,可根据注释中的标记自行查看.2017即将结束,这一年来,给我的一些感悟就是,任何事情无关大小,只要加上"坚持"二字,都会变得格外的不易.大家都知道,健身房主要赚的就是那些坚持不下去人的钱,我就有一个爱好健身的朋友,他能坚持每天健身,我也问过他"秘诀".他是这样说的别人都是为了做别的事轻易就放弃健身,而我最常和别人说的话是,我要去健身了,不能聚会了这个真实的例子,总结起来也就一句话时间在哪,成就就在哪.其实所谓门槛,能力够了就是门,能力不足就是坎.期待下周的dubbo源码解析继续与你相遇.鉴于本人才疏学浅,不对的地方还望斧正,也欢迎关注我的简书,名称为肥朝。
Dubbo的负载均衡算法源码分析Dubbo提供了四种负载均衡:RandomLoadBalance,RoundRobinLoadBalance,LeastActiveLoadBalance,ConsistentHashLoadBalance。
这⾥顺便说下Dubbo的负载均衡是针对单个客户端的,不是全局的。
以下代码基于2.7.2-SNAPSHOT版本。
LoadBalanceLoadBalance接⼝只提供了⼀个对外暴露的⽅法:<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;AbstractLoadBalanceAbstractLoadBalance使⽤模板设计模式,具体负载均衡算法由⼦类的doSelect实现public abstract class AbstractLoadBalance implements LoadBalance {//预热权重计算,provider刚启动权重在预热时间内随启动时间逐渐增加,最⼩为1static int calculateWarmupWeight(int uptime, int warmup, int weight) {int ww = (int) ((float) uptime / ((float) warmup / (float) weight));return ww < 1 ? 1 : (ww > weight ? weight : ww);}//模板⽅法,参数判断public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {if (CollectionUtils.isEmpty(invokers)) {return null;}if (invokers.size() == 1) {return invokers.get(0);}return doSelect(invokers, url, invocation);}//真正执⾏负载均衡的⽅法protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);//计算权重protected int getWeight(Invoker<?> invoker, Invocation invocation) {int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);if (weight > 0) {long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);if (timestamp > 0L) {int uptime = (int) (System.currentTimeMillis() - timestamp);int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);//预热时间,默认为10分钟if (uptime > 0 && uptime < warmup) {//预热weight = calculateWarmupWeight(uptime, warmup, weight);}}}return weight >= 0 ? weight : 0;}}RandomLoadBalancepublic class RandomLoadBalance extends AbstractLoadBalance {public static final String NAME = "random";@Overrideprotected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {int length = invokers.size();//invoker个数boolean sameWeight = true;//每个invoker都有相同权重int[] weights = new int[length];//权重数组int firstWeight = getWeight(invokers.get(0), invocation);//第⼀个权重weights[0] = firstWeight;int totalWeight = firstWeight;//总权重//计算总权重和判断权重是否相同for (int i = 1; i < length; i++) {int weight = getWeight(invokers.get(i), invocation);weights[i] = weight;totalWeight += weight;if (sameWeight && weight != firstWeight) {sameWeight = false;}}//权重不相同if (totalWeight > 0 && !sameWeight) {//得到⼀个在[0,totalWeight)的偏移量,然后这个偏移量所在的invokerint offset = ThreadLocalRandom.current().nextInt(totalWeight);for (int i = 0; i < length; i++) {offset -= weights[i];if (offset < 0) {return invokers.get(i);}}}//权重相同,直接随机[0,length)return invokers.get(ThreadLocalRandom.current().nextInt(length));}}RoundRobinLoadBalancepublic class RoundRobinLoadBalance extends AbstractLoadBalance {public static final String NAME = "roundrobin";//循环周期,如果在这个周期内invoker没有被客户端获取,那么该invoker对应的轮询记录将被删除。
Dubbo解析XML的过程1. 概述Dubbo是阿里巴巴开源的一款高性能的Java RPC框架,提供了服务治理、服务容错、负载均衡等功能。
在Dubbo中,XML配置文件是非常重要的一部分,通过XML配置来定义服务接口和各种配置参数。
对于Dubbo框架来说,解析XML配置文件是一个非常关键的过程,本文将深入探讨Dubbo框架是如何解析XML配置的。
2. 加载配置文件Dubbo在启动的时候会加载XML配置文件,这些配置文件通常包含了服务的接口定义、协议配置、注册中心位置区域、服务提供者的配置等信息。
Dubbo会在启动时读取这些配置文件,并将其解析成对应的配置对象。
3. 解析XMLDubbo使用了基于XSD的XML Schema来定义配置文件的结构,可以通过XSD文件来验证XML配置文件的合法性。
Dubbo框架内部使用了一些类来进行XML解析的工作,其中包括了XML文档解析器、元素解析器等。
4. 创建对象模型在XML解析的过程中,Dubbo会将解析得到的配置信息转化成对应的Java对象模型。
对于服务接口的定义,Dubbo会创建对应的接口描述对象;对于协议配置,Dubbo会创建对应的协议配置对象。
5. 配置参数赋值一旦XML配置文件被成功解析成对象模型,Dubbo会将这些对象模型中的配置参数赋值给对应的组件。
将服务接口描述对象中的接口名称赋值给服务接口代理对象。
6. 注册配置解析XML配置文件的最后一步是将解析得到的配置信息注册到Dubbo框架中。
这个过程包括了将服务接口注册到服务接口管理器、将协议配置注册到协议管理器、将注册中心位置区域注册到注册中心管理器等。
7. 总结在Dubbo框架中,XML配置文件的解析是一个非常重要的过程,它直接影响着整个Dubbo框架的运行。
通过本文的介绍,希望读者能够更加深入地了解Dubbo是如何解析XML配置文件的。
同时也能够更好地理解Dubbo框架的内部工作原理,为使用和定制Dubbo框架提供参考和帮助。
Dubbo服务发现源码解析模块源码模块⼀、⼀、源码1.1 源码模块组织Dubbo⼯程是⼀个Maven多Module的项⽬,以包结构来组织各个模块。
核⼼模块及其关系,如图所⽰:1.2 模块说明dubbo-common 公共逻辑模块,包括Util类和通⽤模型。
dubbo-remoting 远程通讯模块,相当于Dubbo协议的实现,如果RPC⽤RMI协议则不需要使⽤此包。
dubbo-rpc 远程调⽤模块,抽象各种协议,以及动态代理,只包含⼀对⼀的调⽤,不关⼼集群的管理。
dubbo-cluster 集群模块,将多个服务提供⽅伪装为⼀个提供⽅,包括:负载均衡、容错、路由等,集群的地址列表可以是静态配置的,也可以是由注册中⼼下发。
dubbo-registry 注册中⼼模块,基于注册中⼼下发地址的集群⽅式,以及对各种注册中⼼的抽象。
dubbo-monitor 监控模块,统计服务调⽤次数,调⽤时间的,调⽤链跟踪的服务。
dubbo-config 配置模块,是Dubbo对外的API,⽤户通过Config使⽤Dubbo,隐藏Dubbo所有细节。
dubbo-container 容器模块,是⼀个Standalone的容器,以简单的Main类加载Spring启动,因为服务通常不需要Tomcat/JBoss等Web容器的特性,没必要⽤Web容器去加载服务。
因为服务通常不需要 Tomcat/JBoss 等 Web 容器的特性,没必要⽤ Web 容器去加载服务。
⼆、服务发现Dubbo的应⽤会在启动时完成服务注册或订阅(不论是⽣产者,还是消费者)如下图所⽰。
图中⼩⽅块Protocol, Cluster, Proxy, Service, Container, Registry, Monitor代表层或模块,蓝⾊的表⽰与业务有交互,绿⾊的表⽰只对Dubbo内部交互。
图中背景⽅块Consumer, Provider, Registry, Monitor代表部署逻辑拓普节点。
dubbo源码解析⼆invoker链 在上⼀篇中,调⽤远程服务的路径是业务接⼝代理proxy->MockClusterInvoker.invoke->invoker⽗类AbstractClusterInvoker.invoke->FailoverClusterInvoke.invoke。
AbstractClusterInvoker.invoke代码List<Invoker<T>> invokers = list(invocation); 从zk获取最新的invoker列表directory()可以看做是 Invoker 集合,且这个集合中的元素会随注册中⼼的变化⽽进⾏动态调整。
服务导⼊的时候,消费者向注册中⼼注册服务后订阅相应接⼝,传⼊回调listener参数,当该接⼝对应的zk临时⽂件数量即provider数量发⽣变化时,z 更新provider list。
和上篇RegistryProtocol⼀样,根据传⼊的url的protocol通过⾃适应代码⽣成Protocol代理类调⽤refer⽣成invoker,封装成provider list。
protocol封装如下:Protocol$Adpative->QosProtocolWrapper->ProtocolListenerWrapper->ProtocolFilterWrapper->DubboProtocolrefer传递调⽤时,和registy略有不⼀样,在ProtocolListenerWrapper的refer处,if分⽀区分两种invoker⽣成⽅式。
这块代码不再深⼊,⼤致是先new DubboInvoker,然后在之上封装动态获取的filter链,继续封装成ListenerInvokerWrapper后返回。
directory本⾝实现listener接⼝,可以随时更新实例的invoker list。
Dubbo源码解析(⼀)DubboSPIDubbo 扩展机制 SPI在 Dubbo 中,SPI 贯穿在整个 Dubbo 的核⼼。
所以有必要先对 Dubbo 中 SPI 做⼀个详细的介绍。
JDK SPI之前写过⼀篇介绍,可以点击查看。
Dubbo SPIDubbo 实现了⾃⼰的 SPI 机制除了可以配置在 META-INF/services ⽬录下,还可以配置在 META-INF/dubbo 和 META-INF/dubbo/internal配置⽂件内容采⽤ key=value 的形式。
这样可以按需实例化加载类,⽽不⽤像 JDK ⼀样在启动时⼀次性加载实例化扩展点的所有实现,浪费性能。
出现异常时可以更准确定位增加了对 Duboo ⾃⼰实现的 IOC 和 AOP 的⽀持源码解析这⾥使⽤的版本是 dubbo-2.6.4这⾥的分析流程是根据例⼦来分析内部源码的实现注意,在阅读下⾯的源码解析时,有些跟当前流程⽆关的代码我会标注忽略点 - X,表⽰这段代码先跳过,不影响当前流程,且在后⾯⽤到的地⽅我会再重新解释该忽略点的意思。
1、Dubbo SPI 的基本⽰例接⼝,注意加 @SPI 注解@SPIpublic interface Robot {void sayHello();}实现类public class MIRobot implements Robot {@Overridepublic void sayHello() {System.out.println("⼤家好,我是⼩⽶机器⼈...");}}在⽂件夹 resources/META-INF/services 下添加配置⽂件,⽂件名 com.lin.spi.Robot(接⼝的路径)⽂件内容miRobot = com.lin.spi.MIRobot在测试类中调⽤public class DubboSPITest {@Testpublic void test() {ExtensionLoader<Robot> extensionLoader =ExtensionLoader.getExtensionLoader(Robot.class);Robot miRobot = extensionLoader.getExtension("miRobot");miRobot.sayHello();}}输出⼤家好,我是⼩⽶机器⼈...Process finished with exit code 0上⾯就是 Dubbo SPI 的基本使⽤,接下来开始源码分析。
Dubbo源码分析(六)Dubbo通信的编码解码机制TCP的粘包拆包问题我们知道Dubbo的⽹络通信框架Netty是基于TCP协议的,TCP协议的⽹络通信会存在粘包和拆包的问题,先看下为什么会出现粘包和拆包当要发送的数据⼤于TCP发送缓冲区剩余空间⼤⼩,将会发⽣拆包待发送数据⼤于MSS(最⼤报⽂长度),TCP在传输前将进⾏拆包要发送的数据⼩于TCP发送缓冲区的⼤⼩,TCP将多次写⼊缓冲区的数据⼀次发送出去,将会发⽣粘包接收数据端的应⽤层没有及时读取接收缓冲区中的数据,将发⽣粘包以上四点基本上是出现粘包和拆包的原因,业界的解决⽅法⼀般有以下⼏种:将每个数据包分为消息头和消息体,消息头中应该⾄少包含数据包的长度,这样接收端在接收到数据后,就知道每⼀个数据包的实际长度了(Dubbo就是这种⽅案)消息定长,每个数据包的封装为固定长度,不够补0在数据包的尾部设置特殊字符,⽐如FTP协议Dubbo消息协议头规范在dubbo.io官⽹上找到⼀张图,协议头约定dubbo的消息头是⼀个定长的 16个字节的数据包:magic High & Magic Low:2byte:0-7位 8-15位:类似java字节码⽂件⾥的魔数,⽤来判断是不是dubbo协议的数据包,就是⼀个固定的数字Serialization id:1byte:16-20位:序列id,21 event,22 two way ⼀个标志位,是单向的还是双向的,23 请求或响应标识,status:1byte: 24-31位:状态位,设置请求响应状态,request为空,response才有值Id(long):8byte:32-95位:每⼀个请求的唯⼀识别id(由于采⽤异步通讯的⽅式,⽤来把请求request和返回的response对应上)data length:4byte:96-127位:消息体长度,int 类型看完这张图,⼤致可以理解Dubbo通信协议解决的问题,Dubbo采⽤消息头和消息体的⽅式来解决粘包拆包,并在消息头中放⼊了⼀个唯⼀Id来解决异步通信关联request和response的问题,下⾯以⼀次调⽤为⼊⼝分为四个部分来看下源码具体实现Comsumer端请求编码private class InternalEncoder extends OneToOneEncoder {@Overrideprotected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);try {codec.encode(channel, buffer, msg);} finally {NettyChannel.removeChannelIfDisconnected(ch);}return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());}}这个InternalEncoder是⼀个NettyCodecAdapter的内部类,我们看到codec.encode(channel, buffer, msg)这⾥,这个时候codec=DubboCountCodec,这个是在构造⽅法中传⼊的,DubboCountCodec.encode-->ExchangeCodec.encode-->ExchangeCodec.encodeRequestprotected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {//获取序列化⽅式,默认是Hessian序列化Serialization serialization = getSerialization(channel);// new了⼀个16位的byte数组,就是request的消息头byte[] header = new byte[HEADER_LENGTH];// 往消息头中set magic数字,这个时候header中前2个byte已经填充Bytes.short2bytes(MAGIC, header);// set request and serialization flag.第三个byte已经填充header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;if (req.isEvent()) header[2] |= FLAG_EVENT;// set request id.这个时候是0Bytes.long2bytes(req.getId(), header, 4);// 编码 request data.int savedWriteIndex = buffer.writerIndex();buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);//序列化ObjectOutput out = serialization.serialize(channel.getUrl(), bos);if (req.isEvent()) {encodeEventData(channel, out, req.getData());} else {//编码消息体数据encodeRequestData(channel, out, req.getData());}out.flushBuffer();bos.flush();bos.close();int len = bos.writtenBytes();checkPayload(channel, len);//在消息头中设置消息体长度Bytes.int2bytes(len, header, 12);// writebuffer.writerIndex(savedWriteIndex);buffer.writeBytes(header); // write header.buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);}就是这⽅法,对request请求进⾏了编码操作,具体操作我写在代码的注释中,就是刚刚我们分析的消息头的代码实现Provider端请求解码看到NettyCodecAdapter中的InternalDecoder这个类的messageReceived⽅法,这⾥就是Provider端对于Consumer端的request请求的解码public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {···try {// decode object.do {saveReaderIndex = message.readerIndex();try {msg = codec.decode(channel, message);} catch (IOException e) {buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;throw e;}···进⼊DubboCountCodec.decode--ExchangeCodec.decode// 检查 magic number.if (readable > 0 && header[0] != MAGIC_HIGH···}// check 长度如果⼩于16位继续等待if (readable < HEADER_LENGTH) {return DecodeResult.NEED_MORE_INPUT;}// get 消息体长度int len = Bytes.bytes2int(header, 12);checkPayload(channel, len);//消息体长度+消息头的长度int tt = len + HEADER_LENGTH;//如果总长度⼩于tt,那么返回继续等待if (readable < tt) {return DecodeResult.NEED_MORE_INPUT;}// limit input stream.ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);try {//解析消息体内容return decodeBody(channel, is, header);} finally {···}这⾥对于刚刚的request进⾏解码操作,具体操作步骤写在注释中了Provider端响应编码当服务端执⾏完接⼝调⽤,看下服务端的响应编码,和消费端不⼀样的地⽅是,服务端进⼊的是ExchangeCodec.encodeResponse⽅法try {//获取序列化⽅式默认Hession协议Serialization serialization = getSerialization(channel);// 初始化⼀个16位的headerbyte[] header = new byte[HEADER_LENGTH];// set magic 数字Bytes.short2bytes(MAGIC, header);// set request and serialization flag.header[2] = serialization.getContentTypeId();if (res.isHeartbeat()) header[2] |= FLAG_EVENT;// set response status.这⾥返回的是OKbyte status = res.getStatus();header[3] = status;// set request id.Bytes.long2bytes(res.getId(), header, 4);int savedWriteIndex = buffer.writerIndex();buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);ObjectOutput out = serialization.serialize(channel.getUrl(), bos);// 编码返回消息体数据或者错误数据if (status == Response.OK) {if (res.isHeartbeat()) {encodeHeartbeatData(channel, out, res.getResult());} else {encodeResponseData(channel, out, res.getResult());}} else out.writeUTF(res.getErrorMessage());out.flushBuffer();bos.flush();bos.close();int len = bos.writtenBytes();checkPayload(channel, len);Bytes.int2bytes(len, header, 12);// writebuffer.writerIndex(savedWriteIndex);buffer.writeBytes(header); // write header.buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);} catch (Throwable t) {// 发送失败信息给Consumer,否则Consumer只能等超时了if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {try {// FIXME 在Codec中打印出错⽇志?在IoHanndler的caught中统⼀处理?logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);Response r = new Response(res.getId(), res.getVersion());r.setStatus(Response.BAD_RESPONSE);r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));channel.send(r);return;} catch (RemotingException e) {logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e);}}// 重新抛出收到的异常···}基本上和消费⽅请求编码⼀样,多了⼀个步骤,⼀个是在消息头中加⼊了⼀个状态位,第⼆个是如果发送有异常,则继续发送失败信息给Consumer,否则Consumer只能等超时了Conmsuer端响应解码和上⾯的解码⼀样,具体操作是在ExchangeCodec.decode--DubboCodec.decodeBody中。