前言
Dubbo源码阅读分享系列文章,欢迎大家关注点赞
SPI实现部分
注册中心作用
在整个Duubbo架构中,注册中心主要完成以下三件事情:
Provider应用启动后的初始化阶段会向注册中心完成注册操作; Consumer应用启动初始化阶段会完成对所需 Provider的进行订阅操作; 在Provider发生变更时通知监听的Consumer;
Registry在整个架构中主要是对Consumer 和 Provider 所对应的业务进行解耦,从而提升系统的稳定性。关于为什么需要注册中心,大家可以参考一下微服务下的注册中心如何选择,在这篇文章中我花了一小节来介绍这个问题。
源码分析
开始之前,首先来看下关于注册中心源码的项目结构,整个项目由两块组成,一个就是核心Api,另外一个就是具体一些中间件的实现,看到这个我们可能会有一些想法,这个定然会有一些模板类以及一些工厂设计,能想到这里,说明你已经具有很好的抽象思维,废话不多说开始源码。
核心Api
dubbo-registry-api是注册中心核心对象的抽象以及实现部分,我们首先来看下几个核心对象设计
Node
Node位于Dubbo的dubbo-common项目下面,在Dubbo中Node这个接口用来抽象节点的概念,Node不仅可以表示Provider和Consumer节点,还可以表示注册中心节点。Node节点内部定义三个方法:
getUrl方法返回当前节点的URL; isAvailable方法方法返回对象是否可用; destroy方法负责销毁对象;
RegistryService
RegistryService此接口定义注册中心的功能,定义五个方法:
register方法向注册中心注册一个URL; unregister方法取消一个URL的注册; subscribe方法订阅一个URL,订阅成功之后,当订阅的数据发生变化时,注册中心会主动通知第二个参数指定的 NotifyListener对象,NotifyListener接口中定义的 notify() 方法就是用来接收该通知的; unsubscribe方法取消一个URL定义,同时当数据发生变化时候也会主动发起通知; lookup方法查询符合条件的注册的数据,subscribe采用Push方式,而lookup采用的是Pull模式;
Registry
Registry接口继承了Node和RegistryService这两个接口,实现该接口类就是注册中心接口的节点,该方法内部也提供两个默认方法reExportRegister方法和reExportUnregister方法,这两个方法实际调用还是RegistryService中的方法。
public interface Registry extends Node, RegistryService { //调用RegistryService的register default void reExportRegister(URL url) { register(url); } //调用RegistryService的unregister default void reExportUnregister(URL url) { unregister(url); }}
RegistryFactory
RegistryFactory是 Registry 的工厂类,负责创建 Registry 对象,通过@SPI 注解指定了默认的扩展名为 dubbo,@Adaptive注解表示会生成适配器类并根据 URL 参数中的 protocol 参数值选择相应的实现。
@SPI("dubbo")public interface RegistryFactory { @Adaptive({"protocol"}) Registry getRegistry(URL url);}
下图是RegistryFactory多种不同的实现,每个 Registry 实现类都有对应的 RegistryFactory 工厂实现,每个 RegistryFactory 工厂实现只负责创建对应的 Registry 对象。
AbstractRegistryFactory
AbstractRegistryFactory 是一个实现了 RegistryFactory 接口的抽象类,内部维护一个Registry的Map集合以及提供销毁和创建注册中心方法,针对不同的注册中心可以有不同的实现。
//锁 protected static final ReentrantLock LOCK = new ReentrantLock(); //Map protected static final Map<String, Registry> REGISTRIES = new HashMap<>();
销毁
销毁方法分为两个,一个全量,一个是单个,单个销毁在AbstractRegistry中调用,参数是注册实例对象。
//全量销毁 public static void destroyAll() { if (!destroyed.compareAndSet(false, true)) { return; } if (LOGGER.isInfoEnabled()) { LOGGER.info("Close all registries " + getRegistries()); } // Lock up the registry shutdown process LOCK.lock(); try { for (Registry registry : getRegistries()) { try { //一个一个销毁 registry.destroy(); } catch (Throwable e) { LOGGER.error(e.getMessage(), e); } } //清空map缓存 REGISTRIES.clear(); } finally { // Release the lock LOCK.unlock(); } } //单个销毁 public static void removeDestroyedRegistry(Registry toRm) { LOCK.lock(); try { REGISTRIES.entrySet().removeIf(entry -> entry.getValue().equals(toRm)); } finally { LOCK.unlock(); } }
创建/获取
getRegistry是对RegistryFactory实现,如果没有在缓存中,则进行创建实例对象createRegistry,createRegistry是抽象方法,为了让子类重写该方法,比如说redis实现的注册中心和zookeeper实现的注册中心创建方式肯定不同,而他们相同的一些操作都已经在AbstractRegistryFactory中实现,所以只要关注且实现该抽象方法即可。
//抽象的createRegistry方法 protected abstract Registry createRegistry(URL url); //获取实例 public Registry getRegistry(URL url) { Registry defaultNopRegistry = getDefaultNopRegistryIfDestroyed(); if (null != defaultNopRegistry) { return defaultNopRegistry; } //构建key url = URLBuilder.from(url) .setPath(RegistryService.class.getName()) .addParameter(INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(EXPORT_KEY, REFER_KEY) .build(); String key = createRegistryCacheKey(url); // Lock the registry access process to ensure a single instance of the registry LOCK.lock(); try { // double check // fix https://github.com/apache/dubbo/issues/7265. defaultNopRegistry = getDefaultNopRegistryIfDestroyed(); if (null != defaultNopRegistry) { return defaultNopRegistry; } //获取实例对象 Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } //没有获取到就创建 registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } //放入Map集合中 REGISTRIES.put(key, registry); return registry; } finally { // Release the lock LOCK.unlock(); } }
ListenerRegistryWrapper
ListenerRegistryWrapper是对RegistryFactory的扩展,创建Registry时候会包装一个ListenerRegistryWrapper对象,内部维护一个监听器RegistryServiceListener,当注册、取消注册、订阅以及取消订阅的时候,会发送通知。
AbstractRegistry
AbstractRegistry该抽象类是对Registry接口的实现,实现了Registry接口中的注册、订阅、查询、通知等方法,但是注册、订阅、查询、通知等方法只是简单地把URL加入对应的集合,没有具体的注册或订阅逻辑。此外该类还实现了缓存机制,只不过,它的缓存有两份,一份在内存,一份在磁盘。
//本地的Properties文件缓存,在内存中 与file是同步的 private final Properties properties = new Properties(); //该单线程池负责讲Provider的全量数据同步到properties字段和缓存文件中, //如果syncSaveFile配置为false,就由该线程池异步完成文件写入 private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true)); //是否异步写入 private boolean syncSaveFile; //注册数据的版本号 防止旧数据覆盖新数据 private final AtomicLong lastCacheChanged = new AtomicLong(); //保存Properties失败以后异常重试次数 private final AtomicInteger savePropertiesRetryTimes = new AtomicInteger(); //已经注册服务的URL集合,注册的URL不仅仅可以是服务提供者的,也可以是服务消费者的 private final Set<URL> registered = new ConcurrentHashSet<>(); //消费者Url订阅的监听器集合 private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>(); //费者被通知的服务URL集合,最外部URL的key是消费者的URL,value是一个map集合,里面的map中的key为分类名,value是该类下的服务url集合 private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>(); //注册中心URL private URL registryUrl; //本地磁盘Properties文件 private File file;
变更通知
当 Provider 端暴露的 URL 发生变化时,ZooKeeper 等服务发现组件会通知 Consumer 端的 Registry 组件,Registry 组件会调用 notify() 方法,被通知的 Consumer 能匹配到所有 Provider 的 URL 列表并写入 properties 集合以及本地文件中。
protected void notify(List<URL> urls) { if (CollectionUtils.isEmpty(urls)) { return; } //遍历订阅消费者URL的监听器集合,通知他们 for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) { URL url = entry.getKey(); //匹配 if (!UrlUtils.isMatch(url, urls.get(0))) { continue; } //遍历所有监听器 Set<NotifyListener> listeners = entry.getValue(); if (listeners != null) { for (NotifyListener listener : listeners) { try { //通知监听器,URL变化结果 notify(url, listener, filterEmpty(url, urls)); } catch (Throwable t) { logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t); } } } } } /** * Notify changes from the Provider side. * * @param url consumer side url * @param listener listener * @param urls provider latest urls */ protected void notify(URL url, NotifyListener listener, List<URL> urls) { //参数校验 if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } if ((CollectionUtils.isEmpty(urls)) && !ANY_VALUE.equals(url.getServiceInterface())) { logger.warn("Ignore empty notify urls for subscribe url " + url); return; } if (logger.isInfoEnabled()) { logger.info("Notify urls for subscribe url " + url + ", urls: " + urls); } Map<String, List<URL>> result = new HashMap<>(); for (URL u : urls) { //按照url中key为category对应的值进行分类,如果没有该值,就找key为providers的值进行分类 if (UrlUtils.isMatch(url, u)) { String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY); List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>()); //分类结果放入result categoryList.add(u); } } if (result.size() == 0) { return; } //处理通知监听器URL变化结果 Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>()); for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); //把分类标实和分类后的列表放入notified的value中 categoryNotified.put(category, categoryList); //调用NotifyListener监听器 listener.notify(categoryList); //单个Url变更,并将更改信息同步至内存缓存和磁盘缓存中 saveProperties(url); } } private void saveProperties(URL url) { if (file == null) { return; } try { StringBuilder buf = new StringBuilder(); //从通知列表中取出信息 Map<String, List<URL>> categoryNotified = notified.get(url); //以空格为间隔拼接 if (categoryNotified != null) { for (List<URL> us : categoryNotified.values()) { for (URL u : us) { if (buf.length() > 0) { buf.append(URL_SEPARATOR); } buf.append(u.toFullString()); } } } //推送url至内存缓存 properties.setProperty(url.getServiceKey(), buf.toString()); //增加版本号 long version = lastCacheChanged.incrementAndGet(); if (syncSaveFile) { //如果磁盘文件未被加锁,将内存缓存同步至磁盘缓存 doSaveProperties(version); } else { //如果被加锁了,使用新的线程去执行,当前线程返回 registryCacheExecutor.execute(new SaveProperties(version)); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } }
缓存设计
注册中心有两份缓存,一份在内存,一份在磁盘。该方法的作用是将内存中的缓存数据保存在磁盘文件中,该方法有错误重试,最大重试次数是3,重试采用另一个线程去执行重试,不是当前线程。本地缓存设计相当于是一种容错机制,当网络抖动等原因而导致订阅失败时,Consumer端的Registry就可以通过getCacheUrls()方法获取本地缓存,从而得到最近注册的服务提供者。
//将内存中的文件写到磁盘上 public void doSaveProperties(long version) { //版本号判断 防止重复写 if (version < lastCacheChanged.get()) { return; } //判断磁盘文件是否为空 if (file == null) { return; } // Save try { //lock文件,用于加锁操作 File lockfile = new File(file.getAbsolutePath() + ".lock"); if (!lockfile.exists()) { lockfile.createNewFile(); } //RandomAccessFile提供对文件的读写操作 try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw"); FileChannel channel = raf.getChannel()) { //获取锁 FileLock lock = channel.tryLock(); if (lock == null) { throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties"); } // Save try { if (!file.exists()) { file.createNewFile(); } try (FileOutputStream outputFile = new FileOutputStream(file)) { //从内存缓存中获取数据 写入文件 properties.store(outputFile, "Dubbo Registry Cache"); } } finally { lock.release(); } } } catch (Throwable e) { //发生异常时,重试次数+1 savePropertiesRetryTimes.incrementAndGet(); //重试次数大于抛出异常 if (savePropertiesRetryTimes.get() >= MAX_RETRY_TIMES_SAVE_PROPERTIES) { logger.warn("Failed to save registry cache file after retrying " + MAX_RETRY_TIMES_SAVE_PROPERTIES + " times, cause: " + e.getMessage(), e); savePropertiesRetryTimes.set(0); return; } //再次对比版本信息,如果版本已过期,返回不再处理 if (version < lastCacheChanged.get()) { savePropertiesRetryTimes.set(0); return; } else { //重试线程 registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet())); } logger.warn("Failed to save registry cache file, will retry, cause: " + e.getMessage(), e); } } //磁盘中文件加载到内存中 private void loadProperties() { if (file != null && file.exists()) { InputStream in = null; try { in = new FileInputStream(file); properties.load(in); if (logger.isInfoEnabled()) { logger.info("Load registry cache file " + file + ", data: " + properties); } } catch (Throwable e) { logger.warn("Failed to load registry cache file " + file, e); } finally { if (in != null) { try { in.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } } }
注册/订阅
AbstractRegistry 实现了 Registry 接口,当新节点注册进来时候registry() 方法,会将当前节点要注册的 URL缓存到 registered集合,当节点下线时候, unregistry() 方法会从 registered 集合删除指定的 URL。当消费者新增加一个订阅的时候,subscribe() 方法会将当前节点作为 Consumer 的 URL 以及相关的 NotifyListener 记录到 subscribed 集合,当消费者取消一个订阅的时候,unsubscribe() 方法会将当前节点的 URL 以及关联的 NotifyListener 从 subscribed 集合删除。这四个方法相对比较简单,这里不做展示,此处设计为抽象类,当子类重写的时候可以对其进行增强。
恢复/销毁
当因为网络问题与注册中心断开连接之后,会进行重连,重新连接成功之后,会调用 recover() 方法将 registered 集合中的全部 URL 重新执行register() 方法,恢复注册数据。同样,recover() 方法也会将 subscribed 集合中的 URL 重新执行subscribe() 方法,恢复订阅监听器。 当前节点下线的时候,destroy() 方法会调用 unregister() 方法和 unsubscribe() 方法将当前节点注册的 URL 以及订阅的监听全部清理掉,此外还会销毁本实例。
public void destroy() { if (logger.isInfoEnabled()) { logger.info("Destroy registry:" + getUrl()); } Set<URL> destroyRegistered = new HashSet<>(getRegistered()); if (!destroyRegistered.isEmpty()) { for (URL url : new HashSet<>(destroyRegistered)) { if (url.getParameter(DYNAMIC_KEY, true)) { try { //取消注册 unregister(url); if (logger.isInfoEnabled()) { logger.info("Destroy unregister url " + url); } } catch (Throwable t) { logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t); } } } } Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed()); if (!destroySubscribed.isEmpty()) { for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) { URL url = entry.getKey(); for (NotifyListener listener : entry.getValue()) { try { //取消订阅 unsubscribe(url, listener); if (logger.isInfoEnabled()) { logger.info("Destroy unsubscribe url " + url); } } catch (Throwable t) { logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t); } } } } //移除注册中心 AbstractRegistryFactory.removeDestroyedRegistry(this); }
重试机制
关于重试机制,Dubbo将重试机制放在了FailbackRegistry类中,FailbackRegistry 设计思想,重写了 AbstractRegistry 中 register()/unregister()、subscribe()/unsubscribe() 以及 notify() 这五个核心方法,结合时间轮,实现失败重试机制。此外,还添加了四个未实现的抽象模板方法,由其继承者去实现,这里也就是典型的模板类的设计。
核心字段介绍
//注册失败的URL Key是注册失败的 URL,Value 是对应的重试任务 private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>(); //取消注册失败URL private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>(); //订阅失败的URL private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>(); //取消订阅失败的URL private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>(); //重试的时间间隔 private final int retryPeriod; //用于定时执行失败的时间轮 private final HashedWheelTimer retryTimer;
构造方法首先会调用父类的构造方法完成本地缓存相关的初始化操作,然后根据传入URL参数中获取重试操作的时间间隔来初始化 retryPeriod 字段,最后初始化 HashedWheelTimer时间轮。
public FailbackRegistry(URL url) { //调用 super(url); this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD); retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128); }
核心方法
register
register方法重写了父类的注册方法,首先调用父类的register将url加入对应的容器,然后从failedRegistered 和failedUnregistered 两个容器中移除失败URL,然后执行doRegister方法,doRegister是抽象方法,具体的实现交给其继承者,如果注册失败抛出异常,会将URL加入failedRegistered 容器中。
@Override public void register(URL url) { if (!acceptable(url)) { logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type."); return; } //执行父类的方法加入到容器中 super.register(url); //移除注册失败 removeFailedRegistered(url); //移除取消注册失败 removeFailedUnregistered(url); try { //抽象方法交给具体子类去实现 doRegister(url); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } //注册发生异常将注册失败放入到注册失败的容器中 addFailedRegistered(url); } }
接下来我们看下添加重试任务的方法addFailedRegistered,该方法相对比较简单,核心就是将失败的任务放到容器中,然后将失败的任务加入时间轮等待执行。
private void addFailedRegistered(URL url) { //判断容器中是是否存在任务 FailedRegisteredTask oldOne = failedRegistered.get(url); if (oldOne != null) { return; } //将任务添加容器中 FailedRegisteredTask newTask = new FailedRegisteredTask(url, this); oldOne = failedRegistered.putIfAbsent(url, newTask); if (oldOne == null) { // never has a retry task. then start a new task for retry. //将任务提交到时间轮中 等待retryPeriod秒后执行 retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); } }
对于其他unregister()、subscribe()、unsubscribe() 都与register()类似这里就不做过多介绍,简单看下提供几个抽象的方法。
public abstract void doRegister(URL url); public abstract void doUnregister(URL url); public abstract void doSubscribe(URL url, NotifyListener listener); public abstract void doUnsubscribe(URL url, NotifyListener listener);
重试任务
addFailedRegistered方法中创建FailedRegisteredTask以及其他重试任务,都是继承AbstractRetryTask,接下来我们要来关于AbstractRetryTask的设计和实现。在AbstractRetryTask抽象类中,有一个核心run方法实现已经一个抽象方法,该抽象方法也是模板类作用。
@Override public void run(Timeout timeout) throws Exception { //检查定义任务状态以及时间轮状态 if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) { // other thread cancel this timeout or stop the timer. return; } //检查重试次数 if (times > retryTimes) { // reach the most times of retry. logger.warn("Final failed to execute task " + taskName + ", url: " + url + ", retry " + retryTimes + " times."); return; } if (logger.isInfoEnabled()) { logger.info(taskName + " : " + url); } try { &nb
标签:
留言评论