URL
date
AI summary
slug
status
tags
summary
type

项目介绍

之前写过一篇文章整体介绍了一下canal的核心组件
文中是这么描述canal-client-adapter的
canal-client-adapter,也属于canal-client,可以把它理解为官方提供的一些常见的异构数据同步通道的实现,比如同步到elasticsearch、hbase、mongodb、mysql、redis、kafka等。所以和canal-client一样,它支持TCP和MQ的消费模式。 当然如果预设的满足不了,那就还是需要自定义了,可以选择基于canal-client-adapter的框架自定义扩展,也可以自己消费对应的MQ或者store去实现。
canal-client-adapter是canal官方提供的整个数据异构工程的最后一个环节。通过抽象整个同步流程,屏蔽不同异构数据源的api,构建了一套异构数据同步的框架。

项目结构

全篇文章基于canal-1.1.5版本分析
我们先来看看这个项目的模块分层
notion image
上面这些模块,主要分成三类:
  1. launcher——启动模块,里面主要是启动类加一些核心配置
  1. common——公共模块
  1. 其他——都是具体的adapter实现模块,比如es7x,就是具体用es7x的客户端同步到es里

启动流程分析

启动类还是最直观,最容易跟踪的东西。那我们就从launcher模块入手
notion image
这里面和启动流程相关的类有:
  1. BootstrapConfiguration
  1. CanalAdapterService
  1. ApplicationConfigMonitor
  1. CanalAdapterApplication
下面我们按照从易到难的顺序来分析:CanalAdapterApplication > BootstrapConfiguration > ApplicationConfigMonitor > CanalAdapterApplication

CanalAdapterApplication

@SpringBootApplication public class CanalAdapterApplication { public static void main(String[] args) { SpringApplication application = new SpringApplication(CanalAdapterApplication.class); application.setBannerMode(Banner.Mode.OFF); application.run(args); } }
只是个单纯的SpringBoot启动类,我们可以通过启动这个类在本地调试CanalClientAdapter

BootstrapConfiguration

canal-adapter支持两种配置模式:
  1. 从mysql读取配置
  1. 从配置文件读取配置
而这个类主要就是用来从数据库加载远程配置文件的:
public class BootstrapConfiguration { @Autowired private Environment env; private RemoteConfigLoader remoteConfigLoader = null; @PostConstruct public void loadRemoteConfig() { remoteConfigLoader = RemoteConfigLoaderFactory.getRemoteConfigLoader(env); if (remoteConfigLoader != null) { remoteConfigLoader.loadRemoteConfig(); remoteConfigLoader.loadRemoteAdapterConfigs(); remoteConfigLoader.startMonitor(); // 启动监听 } } @PreDestroy public synchronized void destroy() { if (remoteConfigLoader != null) { remoteConfigLoader.destroy(); } } } // com.alibaba.otter.canal.adapter.launcher.monitor.remote.RemoteConfigLoaderFactory#getRemoteConfigLoader public static RemoteConfigLoader getRemoteConfigLoader(Environment env) { try { String jdbcUrl = env.getProperty("canal.manager.jdbc.url"); if (!StringUtils.isEmpty(jdbcUrl)) { // load remote config String driverName = env.getProperty("canal.manager.jdbc.driverName"); String jdbcUsername = env.getProperty("canal.manager.jdbc.username"); String jdbcPassword = env.getProperty("canal.manager.jdbc.password"); return new DbRemoteConfigLoader(driverName, jdbcUrl, jdbcUsername, jdbcPassword); } // 可扩展其它远程配置加载器 } catch (Exception e) { logger.error(e.getMessage(), e); } return null; }
可以看到,上面远程加载配置文件的流程总共分成三个核心动作(line 12-14):
  1. loadRemoteConfig——从mysql下载application.yml 主配置文件到本地磁盘
  1. loadRemoteAdapterConfigs——从mysql下载adapter相关配置文件到本地磁盘
  1. startMonitor——启动监听,定时触发loadRemoteConfigloadRemoteAdapterConfigs
public void startMonitor() { // 监听application.yml变化 executor.scheduleWithFixedDelay(() -> { try { loadRemoteConfig(); } catch (Throwable e) { logger.error("scan remote application.yml failed", e); } }, 10, 3, TimeUnit.SECONDS); // 监听adapter变化 executor.scheduleWithFixedDelay(() -> { try { loadRemoteAdapterConfigs(); } catch (Throwable e) { logger.error("scan remote adapter configs failed", e); } }, 10, 3, TimeUnit.SECONDS); }
loadRemoteConfigloadRemoteAdapterConfigs的具体代码逻辑也比较简单。

loadRemoteConfig

通过下列sql去表canal_config找一条id为2的特定记录,这条记录的content就是canal-adapter的主配置文件application.yml
select name, content, modified_time from canal_config where id=2

loadRemoteAdapterConfigs

adapter的相关配置是记录在canal_adapter_config表里的,这张表每一条记录就代表一个adapter配置文件
因为记录条数可能会比较多,所以这里的查询略有优化。首先会通过一个简单查询(不包括content配置文件字段),然后根据结果和本地的配置文件做modified_time的比对,如果发现有变更再会去mysql拉取有变更的记录的content
select id, category, name, modified_time from canal_adapter_config select id, category, name, content, modified_time from canal_adapter_config where id in ()
上面从远程下载的两种配置文件非常的重要,这里分别介绍一下:

application.yml

主配置文件主要用来配置:
  1. 消费模式(直连canal还是通过消息队列)
  1. 订阅关系(topic、消费组、消费方式(或者叫异构模式))
  1. 数据源定义(主要是消费消息时用来做一些数据查询转换以及etl时的数据源)

adapter.yml

adapter相关的配置文件。主要用来定义数据的清洗逻辑、转换逻辑
还有一点需要注意,该类是通过下面这个配置文件注入的
org.springframework.cloud.bootstrap.BootstrapConfiguration=\\ com.alibaba.otter.canal.adapter.launcher.config.BootstrapConfiguration
这里用了比较取巧的方式,为了让这个类更早的被初始化,然后去加载配置。 并且,还要把应用里的application.yml等配置文件都删掉才能生效,不然会直接追加到本地配置文件application.yml
下图是从官方repo里介绍“从远程加载配置”的用法:
notion image

ApplicationConfigMonitor

这个类如它的名字一样,主要是用来监听application.yml配置文件变更的
@PostConstruct public void init() { File confDir = Util.getConfDirPath(); try { FileAlterationObserver observer = new FileAlterationObserver(confDir, FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.prefixFileFilter("application"), FileFilterUtils.suffixFileFilter("yml"))); FileListener listener = new FileListener(); observer.addListener(listener); fileMonitor = new FileAlterationMonitor(3000, observer); fileMonitor.start(); } catch (Exception e) { logger.error(e.getMessage(), e); } }
是不是感觉这个和上面的RemoteConfigLoader.startMonitor()有一点相似和重复? 其实并没有,它俩承担了不同的职能:
  1. RemoteConfigLoader.startMonitor()是负责把mysql里的配置下载到本地磁盘上
  1. ApplicationConfigMonitor是负责监听本地磁盘上文件的变化,进而触发应用内的相关对象的变化
另外,这个类只监听了application.yml,adapter的配置文件监听会在后面看到

CanalAdapterService

这可能是本篇最复杂的一个启动类讲解了。以下的代码片段都做过一些简化处理,只保留了和流程相关的部分,方便大家理解。
// com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService#init @PostConstruct public synchronized void init() { if (running) { return; } adapterLoader = new CanalAdapterLoader(adapterCanalConfig); // 初始化CanalAdapterLoader adapterLoader.init(); running = true; } // com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader#init public void init() { loader = ExtensionLoader.getExtensionLoader(OuterAdapter.class); // 根据application.yml里的配置来创建AdapterProcessor for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) { for (CanalClientConfig.Group group : canalAdapter.getGroups()) { List<List<OuterAdapter>> canalOuterAdapterGroups = new CopyOnWriteArrayList<>(); List<OuterAdapter> canalOuterAdapters = new CopyOnWriteArrayList<>(); // 对于每个配置的outerAdapter都会去load对应的Adapter,并加入到canalOuterAdapters for (OuterAdapterConfig config : group.getOuterAdapters()) { loadAdapter(config, canalOuterAdapters); } // 这个list有点匪夷所思,因为不管怎么样,这个list的长度永远都是1... canalOuterAdapterGroups.add(canalOuterAdapters); // AdapterProcessor 是个核心类,后面我们分析 AdapterProcessor adapterProcessor = canalAdapterProcessors.computeIfAbsent(canalAdapter.getInstance() + "|" + StringUtils.trimToEmpty(group.getGroupId()), f -> new AdapterProcessor(canalClientConfig, canalAdapter.getInstance(), group.getGroupId(), canalOuterAdapterGroups)); // 启动 AdapterProcessor adapterProcessor.start(); + group.getGroupId()); } } } private void loadAdapter(OuterAdapterConfig config, List<OuterAdapter> canalOutConnectors) { // 简化后的代码 OuterAdapter adapter = loader.getExtension(config.getName(), StringUtils.trimToEmpty(config.getKey())); adapter.init(config, evnProperties); canalOutConnectors.add(adapter); }
由于我们这里主要关注的是启动流程,会选择性忽略一些类加载、SPI相关代码的分析。 上述代码间写了一些注释,可以辅助理解,这里再做一些说明:
  1. 首先,因为核心逻辑主要围绕application.yml这个配置文件,所以你需要对这个配置文件的结构有一定的了解,主要了解canalAdapters这个配置项 参考链接
  1. 看完配置项之后我再大概解释一下,其实canalAdapters有点像配置MQ的消费组一样,对于每个canalAdapter,你需要配置
    1. 1个需要监听的topic
    2. 1-n个消费组标识groupId
    3. 每个消费组内可以配置1-n个串行消费动作outerAdapters
  1. 同一个topic可以配置多个消费组,每个消费组可以配置多个消费动作
  1. 每一个消费组会生成一个AdapterProcessor,会调用每个AdapterProcessor的start方法

AdapterProcessor

下面分析核心类AdapterProcessor,先看构造函数
public AdapterProcessor(CanalClientConfig canalClientConfig, String destination, String groupId, List<List<OuterAdapter>> canalOuterAdapters){ // application.yml配置 this.canalClientConfig = canalClientConfig; // 监听topic this.canalDestination = destination; // 消费组标识 this.groupId = groupId; // 消费动作 this.canalOuterAdapters = canalOuterAdapters; // 永远都是size为1的线程池 this.groupInnerExecutorService = Util.newFixedThreadPool(canalOuterAdapters.size(), 5000L); syncSwitch = (SyncSwitch) SpringContext.getBean(SyncSwitch.class); // 加载canalMsgConsumer,这个可以理解是MQ的代理对象 ExtensionLoader<CanalMsgConsumer> loader = new ExtensionLoader<>(CanalMsgConsumer.class); canalMsgConsumer = loader .getExtension(canalClientConfig.getMode().toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR); Properties properties = canalClientConfig.getConsumerProperties(); properties.put(CanalConstants.CANAL_MQ_FLAT_MESSAGE, canalClientConfig.getFlatMessage()); properties.put(CanalConstants.CANAL_ALIYUN_ACCESS_KEY, canalClientConfig.getAccessKey()); properties.put(CanalConstants.CANAL_ALIYUN_SECRET_KEY, canalClientConfig.getSecretKey()); ClassLoader cl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(canalMsgConsumer.getClass().getClassLoader()); // 初始化 CanalMsgConsumer canalMsgConsumer.init(properties, canalDestination, groupId); Thread.currentThread().setContextClassLoader(cl); }
既然看到CanalMsgConsumer了,我们简单看下它的接口定义,会更容易理解:
public interface CanalMsgConsumer { void init(Properties properties, String topic, String groupId); void connect(); List<CommonMessage> getMessage(Long timeout, TimeUnit unit); void ack(); void rollback(); void disconnect(); }
看着这几个接口定义,应该大致都能猜到消费的流程了,不急,我们还是耐着性子看代码:
public void start() { if (!running) { thread = new Thread(this::process); thread.setUncaughtExceptionHandler(handler); thread.start(); running = true; } }
AdapterProcessor的start方法,创建并启动了一个线程,核心逻辑process
private void process() { // 这里为什么需要2个while 不是很懂 while (!running) { // waiting until running == true while (!running) { try { Thread.sleep(1000); } catch (InterruptedException e) { } } } // 消息重试次数 int retry = canalClientConfig.getRetries() == null || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries(); if (retry == -1) { retry = Integer.MAX_VALUE; } while (running) { try { // 连接MQ,理论上应该是订阅topic,一会看具体代码 canalMsgConsumer.connect(); while (running) { if (!running) { break; } for (int i = 0; i < retry; i++) { if (!running) { break; } try { // pull消息 List<CommonMessage> commonMessages = canalMsgConsumer .getMessage(this.canalClientConfig.getTimeout(), TimeUnit.MILLISECONDS); // 如果超时没有pull到消息,理论上这里最好加个非空判断 // 消费逻辑 writeOut(commonMessages); // 消费成功之后ack canalMsgConsumer.ack(); break; } catch (Exception e) { // 如果消费重试retry次依旧报错,那么就直接ack掉了(注意:这样会丢消息) if (i != retry - 1) { canalMsgConsumer.rollback(); } else { canalMsgConsumer.ack(); } Thread.sleep(500); } } } // 断开与MQ的连接 canalMsgConsumer.disconnect(); } catch (Throwable e) { logger.error("process error!", e); } if (running) { // is reconnect try { Thread.sleep(1000); } catch (InterruptedException e) { // ignore } } } }
线程里的逻辑很简单:
  1. 向mq订阅对应的topic
  1. 去mq里拉取消息
  1. 处理消息
  1. 向mq ack消息
  1. 重复第2步
对应CanalMsgConsumer的具体实现我们就不看了,这里重点再看下处理消息的逻辑(writeOut):
public void writeOut(final List<CommonMessage> commonMessages) { List<Future<Boolean>> futures = new ArrayList<>(); // 这个canalOuterAdapters size永远都是1,其实不需要foreach canalOuterAdapters.forEach(outerAdapters -> { futures.add(groupInnerExecutorService.submit(() -> { try { // 组内适配器穿行运行,尽量不要配置组内适配器 outerAdapters.forEach(adapter -> { List<Dml> dmls = MessageUtil.flatMessage2Dml(canalDestination, groupId, commonMessages); batchSync(dmls, adapter); }); return true; } catch (Exception e) { logger.error(e.getMessage(), e); return false; } })); RuntimeException exception = null; for (Future<Boolean> future : futures) { try { if (!future.get()) { exception = new RuntimeException("Outer adapter sync failed! "); } } catch (Exception e) { exception = new RuntimeException(e); } } if (exception != null) { throw exception; } }); }
如果你仔细看到了这里,我想你应该能发现,上面这段代码其实不用这么复杂,因为canalOuterAdapters的长度永远都是1,不存在并行的场景,直接串行执行组内适配器即可:
public void writeOut(final List<CommonMessage> commonMessages) { canalOuterAdapters.get(0).forEach(adapter -> { // 消息类型转换 List<Dml> dmls = MessageUtil.flatMessage2Dml(canalDestination, groupId, commonMessages); batchSync(dmls, adapter); }); }
再看看batchSync的代码:
private void batchSync(List<Dml> dmls, OuterAdapter adapter) { // 分批同步 if (dmls.size() <= canalClientConfig.getSyncBatchSize()) { adapter.sync(dmls); } else { int len = 0; List<Dml> dmlsBatch = new ArrayList<>(); for (Dml dml : dmls) { dmlsBatch.add(dml); if (dml.getData() == null || dml.getData().isEmpty()) { len += 1; } else { len += dml.getData().size(); } if (len >= canalClientConfig.getSyncBatchSize()) { adapter.sync(dmlsBatch); dmlsBatch.clear(); len = 0; } } if (!dmlsBatch.isEmpty()) { adapter.sync(dmlsBatch); } } }
支持了分批同步,上面的分支逻辑还算清晰:
  1. 如果总消息数都没达到1个批次的数量,那么直接同步
  1. 如果总消息数超过了1个批次的数量,那么就1批批的做同步
    1. 如果最终还剩余不到1批次的数量,那么再同步一次
好了,核心的消息逻辑就分析完了,adapter.sync跟具体的实现挂钩了
notion image
设计到具体的实现逻辑,这块我们后面再单独写。

总结

本篇文章重点分析了canal-adapter的启动流程。并且推荐以远程配置的方式运行canal-adapter。在这里再总结一下整个启动流程:
  1. 连接mysql加载主配置文件和adapter相关配置文件
  1. 启动定时器轮询配置文件,发现有变化则下载到本地
  1. 启动定时器监听本地配置文件,发现有变化则触发应用读取文件并reload相应组件
  1. 针对于每一个topic / instance,可配置多个消费组,每个消费组可以配置一组消费单元,组内消费单元串行执行
  1. 每个消费组只有一个线程去拉取消息,但是可以通过多实例来控制线程拉取的topic的队列数来达到水平扩展的效果
线上问题分析——一次线上OOM问题的分析与解决RocketMQ使用docker启动导致2台消费者实例instanceId相同