前言
Dubbo源码阅读分享系列文章,欢迎大家关注点赞
SPI实现部分
注册中心
通信模块介绍
Dubbo通信模块主要的目的就是解决客户端以服务端通信的问题,核心代码都在dubbo-remoting模块,该模块提供了多种客户端和服务端通信的功能。Dubbo的通信主要包括是三部分:Exchange、Transport和Serialize,对于序列化部分的设计在单独的模块中,我们再单独聊,这篇文章主要聊Exchange、Transport设计。对于Dubbo来说没有自己的网络框架,使用现有第三方类库,因此需要设计一套标准API来兼容多种不同的通信框架,dubbo-remoting 模块的结构就是目前Dubbo兼容的所有的通信框架。在整体模块设计上,dubbo-remoting-api是其他模块上层抽象,其他子模块都是依赖第三方NIO库实现 dubbo-remoting-api模块的。因此我们想要了解清楚dubbo-remoting设计必须要理解dubbo-remoting-api的设计。对于dubbo-remoting-api大致可以分为四类,
核心API设计,主要是包括端口、编码、解码等等核心接口的抽象; buffer,主要是定义缓冲区相关的接口、抽象类以及实现类; exchange,抽Request和Response概念抽象以及扩展; transport,网络传输层的抽象,但它只负责消息的传输;
源码分析
核心API设计
Endpoint
Endpoint被翻译端点,这里可以理解为通信中对IP和Port的抽象,Client和Server端共同的抽象,两个端通过Endpoint建立TCP连接,进行通信。对于该Endpoint接口定义了三类方法:
get类方法,主要获取Endpoint的本地地址、关联的URL信息以及底层Channel关联的ChannelHandle,也就是获取建立连接需要的属性; send方法主要负责发送数据; close类方法,主要是用来关闭连接;
Channel
Channel可以理解为Client和Server端连接的通道,是NIO框架设计中不可缺少的概念,Channel继承Endpoint,因此拥有Endpoint的能力,对于Channel来说,可以给自身设计一些额外属性。
ChannelHandler
ChannelHandler可以理解为Channel的处理器,ChannelHandler 可以处理Channel的连接建立以及连接断开事件,还可以处理读取到的数据、发送的数据以及捕获到的异常。
Codec2
Codec2实现编码和解码,实现字节与消息体之间的转换,类似Netty中编码和解码。此外,Codec2接口被@SPI 接口修饰了,说明该接口是一个扩展接口,同时encode方法和 decode方法都被@Adaptive注解修饰,因此也会生成适配器类,可以根据URL中的codec值确定具体的扩展实现类,这里就体现SPI和URL灵活配置的特性。
@SPIpublic interface Codec2 { @Adaptive({Constants.CODEC_KEY}) void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException; @Adaptive({Constants.CODEC_KEY}) Object decode(Channel channel, ChannelBuffer buffer) throws IOException; enum DecodeResult { NEED_MORE_INPUT, SKIP_SOME_INPUT }}
此外还存在DecodeResult的枚举,该枚举是处理粘包和拆包使用的。
Client
Client继承了Endpoint、Channel等相关的接口,因此对于Client也具备收发消息能力,Client只可以关联一个 Channel。
RemotingServer
Server与Client不太一样地方就是可以接收多个Client发起的Channel连接,因此RemotingServer接口中存在获取多个Channel列表的接口。
Transporter
Transporter接口是Dubbo在Client和Server上又封装的一层,我们可以看到改接口被@SPI以及@Adaptive注解修饰,因此这个是个可扩展的接口,默认使用Netty的扩展,@Adaptive表示可以动态生成该适配的类,根据设置的值确定具体实现的类。
@SPI("netty")public interface Transporter { @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY}) RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException; @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) Client connect(URL url, ChannelHandler handler) throws RemotingException;}
Transporter的实现类有主要有以下几种,每个对应的具体的NIO的实现都在其各自的包中,这样可以通过灵活配置来进行切换不同的实现。为了验证是否正确,我们简单再来看一下RemotingServer的实现,RemotingServer的实现中,包含每个具体NIO框架的实现,因此这里更加印证Transporter的的抽象,让我们可以通过Dubbo SPI修改具体Transporter扩展实现,从而切换到不同的Client和 RemotingServer实现,从而达到NIO库切换,这里我们无需修改任何代码,真正的做到开放-闭合的原则。
Transporters
Transporters该类是一种门面模式的设计,主要是解决和多个不同子模块直接进行交互的问题,通过该类设计,将公共的行为Transporter对象的创建以及ChannelHandler的处理,大家可以直接依赖Transporters类,这部分调用是在Dubbo协议初始化时候发起的,这部分我们到时候在细讲,这个章节暂时先不讲解。但是这里需要在这个看一下关于ChannelHandler的处理,此处传入了多个ChannelHandler,将多个ChannelHandler包装成为ChannelHandlerDispatcher,ChannelHandlerDispatcher实现ChannelHandler,内部维护了一个 CopyOnWriteArraySet,对外提供操作ChannelHandler方法,此处主要是为了引出后续Handler的处理流程,后续一层处理模型的源头都在这里。
到这里我们大概对Dubbo的通讯模型有了一个轮廓,我们来进行一个简单的总结,可以参考下图:
上层通过会Transporters获取到具体的Transporter扩展实现,然后通过Transporter获取Client和 RemotingServer实现; Client与RemotingServer都是通过Channel进行交互,Channel使用ChannelHandler进行数据传输,此外通过Codec2进行编解码;
Buffer设计
接口设计
ChannelBuffer的设计类似于Netty的Buffer的设计,大致可以分为五类,对于具体的实现我们在后面AbstractChannelBuffer等实现类里面进行讲解。接下来我们来看一下ChannelBufferFactory,该接口都是用来创建ChannelBuffer的,并且每个具体的实现都是单例的,可以理解为一个简单工厂的设计,可以有不同类型的ChannelBuffer的实现。
AbstractChannelBuffer
AbstractChannelBuffer维护两类索引,一类用于读写,另外一类用于读写标记;关于读写类索引就是记录当前读到什么位置以及写到什么位置了,标记类索引就是为了做数据备份和回滚使用,为了对缓冲区重复利用。该类的方法都主要是利用四个属性来操作,用来检测是否有数据可读或者还是否有空间可写等方法,做一些前置条件的校验以及索引的设置,具体的实现都是需要子类来实现。
@Override public void readBytes(byte[] dst, int dstIndex, int length) { //检查位置是否足够 checkReadableBytes(length); //此处可以理解为将readerIndex后移length个字节读取到dst数组中 //也就是数组dst的dstIndex~dstIndex+length位置 getBytes(readerIndex, dst, dstIndex, length); //readerIndex后移length个字节 readerIndex += length; } @Override public void readBytes(byte[] dst, int dstIndex, int length) { //检查位置是否足够 checkReadableBytes(length); //此处可以理解为将readerIndex后移length个字节读取到dst数组中 //也就是数组dst的dstIndex~dstIndex+length位置 getBytes(readerIndex, dst, dstIndex, length); //readerIndex后移length个字节 readerIndex += length; } @Override public void writeBytes(byte[] src, int srcIndex, int length) { //将src数组中srcIndex~srcIndex+length位置的数据写到当前的buffer中 setBytes(writerIndex, src, srcIndex, length); //将当前的writerIndex后移length writerIndex += length; }
HeapChannelBuffer
HeapChannelBuffer是ChannelBuffer的一种具体的实现,该类是基于字节数组的ChannelBuffer实现,通过byte[]数组来进行数据的存储,setBytes和getBytes通过System.arraycopy来进行对数组的操作。
//此缓冲区包装的基础堆字节数组 protected final byte[] array; @Override public void getBytes(int index, byte[] dst, int dstIndex, int length) { System.arraycopy(array, index, dst, dstIndex, length); } @Override public void setBytes(int index, byte[] src, int srcIndex, int length) { System.arraycopy(src, srcIndex, array, index, length); }
对于HeapChannelBuffer的具体的工厂的实现是HeapChannelBufferFactory,该工厂是一个单例模式,HeapChannelBufferFactory通过ChannelBuffers工具类创建固定容量的HeapChannelBuffer,此外也可以通过拷贝的形式创建HeapChannelBuffer。
@Override public ChannelBufferFactory factory() { return HeapChannelBufferFactory.getInstance(); }
DynamicChannelBuffer
DynamicChannelBuffer可以理解为一个扩展类,也就是对装饰者模式,就是对ChannelBuffer的增加强,增加动态扩容的能力,关于该类默认的实现HeapChannelBufferFactory,我可以通过指定HeapChannelBufferFactory为对应的实现添加动态扩容的能力。
//具体的ChannelBufferFactory的实现 private final ChannelBufferFactory factory; //需要扩容的buffer private ChannelBuffer buffer; public DynamicChannelBuffer(int estimatedLength) { //默认实现 this(estimatedLength, HeapChannelBufferFactory.getInstance()); } //指定具体的实现 public DynamicChannelBuffer(int estimatedLength, ChannelBufferFactory factory) { if (estimatedLength < 0) { throw new IllegalArgumentException("estimatedLength: " + estimatedLength); } if (factory == null) { throw new NullPointerException("factory"); } this.factory = factory; buffer = factory.getBuffer(estimatedLength); }
关于如何实现ChannelBuffer的动态扩容,看懂Java ArryList扩容的,我相信一定能理解,也就是我们要控制写入时候的判断写入的空间是否足够就可以了。DynamicChannelBuffer通过ensureWritableBytes方法来实现扩容,我们来看下他是如何做的:
@Override public void ensureWritableBytes(int minWritableBytes) { //如果写入字节数小于等于可写的字节数 if (minWritableBytes <= writableBytes()) { return; } //新增容量 int newCapacity; //缓存区字节数为0 if (capacity() == 0) { //设置为1 newCapacity = 1; } else { //新增容量为缓冲区字节数 newCapacity = capacity(); } //最小新增容量 = 当前写入字节数的索引+最小写入的字节数 int minNewCapacity = writerIndex() + minWritableBytes; //如果新增容量小于最小新增容量 while (newCapacity < minNewCapacity) { //新增容量左移1位,加倍 newCapacity <<= 1; } //通过工厂类创建该容量 ChannelBuffer newBuffer = factory().getBuffer(newCapacity); //从buffer中读取数据到newBuffer中 newBuffer.writeBytes(buffer, 0, writerIndex()); //替换原来的缓存区 buffer = newBuffer; }
ByteBufferBackedChannelBuffer
ByteBufferBackedChannelBuffer该类是基于Java NIO的ByteBuffer实现的ChannelBuffer,都是通过操作ByteBuffer的API进行实现,这里我们就不展开了。
//NIO ByteBuffer private final ByteBuffer buffer; //初始化容量 private final int capacity; public ByteBufferBackedChannelBuffer(ByteBuffer buffer) { if (buffer == null) { throw new NullPointerException("buffer"); } this.buffer = buffer.slice(); capacity = buffer.remaining(); writerIndex(capacity); } public ByteBufferBackedChannelBuffer(ByteBufferBackedChannelBuffer buffer) { this.buffer = buffer.buffer; capacity = buffer.capacity; setIndex(buffer.readerIndex(), buffer.writerIndex()); }
ChannelBufferInputStream
ChannelBufferInputStream该类实现InputStream输入流的的方法,内部维护了ChannelBuffer、startIndex以及endIndex,该方法内部都是读取ChannelBuffer中的数据,startIndex和endIndex控制读取数据位置,这样就完成 InputStream的扩展实现。
//ChannelBuffer private final ChannelBuffer buffer; //开始位置 private final int startIndex; //结束位置 private final int endIndex; @Override public int read() throws IOException { if (!buffer.readable()) { return -1; } return buffer.readByte() & 0xff; }
ChannelBufferOutputStream
ChannelBufferOutputStream该类实现OutputStream输出流,内部维护了ChannelBuffer、startIndex,该方法内部都是写入到ChannelBuffer中,startIndex是标记开始写入位置。 Buffer的整体的设计到此就介绍完成,通过ChannelBufferOutputStream、ChannelBufferInputStream控制数据的输入输出,内部通过ChannelBuffer存储数据,ChannelBuffer可以根据需要进行不同的实现。
Transport设计
Transport在核心API中介绍上层访问都是通过该接口访问的,接下来我们就来探秘下Transport层都做了哪些事情。
AbstractPeer
AbstractPeer该抽象类可以理解为服务器概念,继承了Endpoint、ChannelHandler接口,内部有四个核心的属性,URL代表自身服务的地址,closing、closed表示当前服务器状态,handler就是ChannelHandler,AbstractPeer内部实现了都是委托给ChannelHandler,这是一种典型的装饰器设计模式。
//ChannelHandler private final ChannelHandler handler; //自身地址 private volatile URL url; //服务器状态 private volatile boolean closing; private volatile boolean closed; public AbstractPeer(URL url, ChannelHandler handler) { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } this.url = url; this.handler = handler; }
AbstractEndpoint
AbstractEndpoint继承AbstractPeer,可以理解为端口的抽象,内部增加Codec2和connectTimeout两个属性,在AbstractEndpoint在初始化的时候会将这两个字段初始化。
private Codec2 codec; private int connectTimeout; public AbstractEndpoint(URL url, ChannelHandler handler) { //调用父类 super(url, handler); //根据URL中的codec参数值,确定此处具体的Codec2实现类 this.codec = getChannelCodec(url); //设置connectTimeout this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT); } protected static Codec2 getChannelCodec(URL url) { //获取URL协议 String codecName = url.getProtocol(); //判断有没有该扩展名 if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) { //通过ExtensionLoader加载具体实现类 return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName); } else { //没有匹配到从扩展类进行加载 return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class) .getExtension(codecName)); } }
此外该接口实现Resetable接口,该接口内部只有一个reset方法,该方法通过获取URL参数信息,重置了connectTimeout的信息以及Codec2的信息。
AbstractServer
AbstractServer是对服务端的抽象,该抽象类实现AbstractEndpoint和RemotingServer,该抽象类内部有五个核心属性,localAddress、bindAddress这两个属性都是在URL参数中获取,表示Server本地的地址以及绑定的地址,默认两个值是一致的,accepts表示是Server最大的连接次数,默认是0,表述没有限制,executorRepository、executor线程池相关的属性,executorRepository负责管理线程池,executor表示当前服务管理的线程池。
//当前服务关联的线程池 ExecutorService executor; //本机地址 private InetSocketAddress localAddress; //绑定地址 private InetSocketAddress bindAddress; //最大连接数 private int accepts; //管理线程池 private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
AbstractServer初始化也就是在构造函数中完成初始化的,然后通过调用其抽象方法doOpen实现启动服务器。
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { //调用父类 super(url, handler); //从URL获取本地地址 localAddress = getUrl().toInetSocketAddress(); String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = ANYHOST_VALUE; } //绑定地址 bindAddress = new InetSocketAddress(bindIp, bindPort); //连接数 this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS); try { //调用该抽象方法启动服务 doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } //创建该服务对应的线程池 executor = executorRepository.createExecutorIfAbsent(url); }
AbstractClient
AbstractClient是对客户端的抽象,同样它的继承和AbstractServer也一样,只是在实现不同而已,接下来我们来看看AbstractClient的实现,该类内部有4个关键的字段,对于executor和executorRepository这两个字段与AbstractServer功能类似,这里重点来介绍connectLock和needReconnect,connectLock是当客户端进行连接、断开、重连等操作时,需要获取该锁进行同步操作,needReconnect 在客户端发送数据之前,会检查客户端的连接是否断开,如果断开了,则会根据needReconnect字段,决定是否重连。AbstractClient整体的初始化是在构造函数实现的,我们可以看到AbstractClient 定义了 doOpen、doClose、doConnect和doDisConnect四个抽象方法给子类实现,整体的设计与AbstractServer类似。
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException { //调用父类构造方法 super(url, handler); //从URL获取是否重连字段 默认是 needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, true); //初始化Executor initExecutor(url); try { //初始化具体的底层实现client doOpen(); } catch (Throwable t) { //关闭 close(); throw new RemotingException(url.toInetSocketAddress(), null, "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t); } try { //创建连接 connect(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress()); } } catch (RemotingException t) { if (url.getParameter(Constants.CHECK_KEY, true)) { close(); throw t; } else { logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t); } } catch (Throwable t) { close(); throw new RemotingException(url.toInetSocketAddress(), null, "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t); } }
AbstractChannel
AbstractChannel的设计也是类似模板类的设计,对于不同的NIO框架来说有不同的Channel的实现,因此对于Dubbo来说也必须去抽象该实现,具体的不同交由子类进行实现,子类做映射。该类内部只有有一个Send方法,为了判断当前的连接是否还在,没有实现具体的发送消息。
Netty4
NettyTransporter
NettyTransporter实现Transporter,当SPI机制触发的时候会自动加载实现NettyServer、NettyClient初始化创建。
NettyServer
接下来我们来看下Netty4中关于doOpen方法的实现,此处就是Netty Server启动的核心,也是Dubbo网络通信的服务端能力的提供者,就是Dubbo和Netty结合的核心。
protected void doOpen() throws Throwable { //创建ServerBootstrap bootstrap = new ServerBootstrap(); //创建boss EventLoopGroup bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss"); //创建worker EventLoopGroup workerGroup = NettyEventLoopFactory.eventLoopGroup( getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), "NettyServerWorker"); //创建一个Netty的ChannelHandler final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); //此处的Channel是Dubbo的Channel channels = nettyServerHandler.getChannels(); //会话保持 boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE); bootstrap.group(bossGroup, workerGroup) .channel(NettyEventLoopFactory.serverSocketChannelClass()) .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_KEEPALIVE, keepalive) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // FIXME: should we use getTimeout()? //连接空闲超时时间 int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); //创建Netty实现的decoder和encoder NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); &nb
标签:
留言评论