URL
date
AI summary
slug
status
tags
summary
type
项目介绍
之前写过一篇文章整体介绍了一下canal的核心组件
文中是这么描述canal-client-adapter的
canal-client-adapter,也属于canal-client,可以把它理解为官方提供的一些常见的异构数据同步通道的实现,比如同步到elasticsearch、hbase、mongodb、mysql、redis、kafka等。所以和canal-client一样,它支持TCP和MQ的消费模式。 当然如果预设的满足不了,那就还是需要自定义了,可以选择基于canal-client-adapter的框架自定义扩展,也可以自己消费对应的MQ或者store去实现。
canal-client-adapter是canal官方提供的整个数据异构工程的最后一个环节。通过抽象整个同步流程,屏蔽不同异构数据源的api,构建了一套异构数据同步的框架。
项目结构
全篇文章基于canal-1.1.5版本分析
我们先来看看这个项目的模块分层
上面这些模块,主要分成三类:
- launcher——启动模块,里面主要是启动类加一些核心配置
- common——公共模块
- 其他——都是具体的adapter实现模块,比如es7x,就是具体用es7x的客户端同步到es里
启动流程分析
启动类还是最直观,最容易跟踪的东西。那我们就从launcher模块入手
这里面和启动流程相关的类有:
- BootstrapConfiguration
- CanalAdapterService
- ApplicationConfigMonitor
- CanalAdapterApplication
下面我们按照从易到难的顺序来分析:CanalAdapterApplication > BootstrapConfiguration > ApplicationConfigMonitor > CanalAdapterApplication
CanalAdapterApplication
@SpringBootApplication public class CanalAdapterApplication { public static void main(String[] args) { SpringApplication application = new SpringApplication(CanalAdapterApplication.class); application.setBannerMode(Banner.Mode.OFF); application.run(args); } }
只是个单纯的SpringBoot启动类,我们可以通过启动这个类在本地调试CanalClientAdapter
BootstrapConfiguration
canal-adapter支持两种配置模式:
- 从mysql读取配置
- 从配置文件读取配置
而这个类主要就是用来从数据库加载远程配置文件的:
public class BootstrapConfiguration { @Autowired private Environment env; private RemoteConfigLoader remoteConfigLoader = null; @PostConstruct public void loadRemoteConfig() { remoteConfigLoader = RemoteConfigLoaderFactory.getRemoteConfigLoader(env); if (remoteConfigLoader != null) { remoteConfigLoader.loadRemoteConfig(); remoteConfigLoader.loadRemoteAdapterConfigs(); remoteConfigLoader.startMonitor(); // 启动监听 } } @PreDestroy public synchronized void destroy() { if (remoteConfigLoader != null) { remoteConfigLoader.destroy(); } } } // com.alibaba.otter.canal.adapter.launcher.monitor.remote.RemoteConfigLoaderFactory#getRemoteConfigLoader public static RemoteConfigLoader getRemoteConfigLoader(Environment env) { try { String jdbcUrl = env.getProperty("canal.manager.jdbc.url"); if (!StringUtils.isEmpty(jdbcUrl)) { // load remote config String driverName = env.getProperty("canal.manager.jdbc.driverName"); String jdbcUsername = env.getProperty("canal.manager.jdbc.username"); String jdbcPassword = env.getProperty("canal.manager.jdbc.password"); return new DbRemoteConfigLoader(driverName, jdbcUrl, jdbcUsername, jdbcPassword); } // 可扩展其它远程配置加载器 } catch (Exception e) { logger.error(e.getMessage(), e); } return null; }
可以看到,上面远程加载配置文件的流程总共分成三个核心动作(line 12-14):
loadRemoteConfig
——从mysql下载application.yml
主配置文件到本地磁盘
loadRemoteAdapterConfigs
——从mysql下载adapter相关配置文件到本地磁盘
startMonitor
——启动监听,定时触发loadRemoteConfig
和loadRemoteAdapterConfigs
public void startMonitor() { // 监听application.yml变化 executor.scheduleWithFixedDelay(() -> { try { loadRemoteConfig(); } catch (Throwable e) { logger.error("scan remote application.yml failed", e); } }, 10, 3, TimeUnit.SECONDS); // 监听adapter变化 executor.scheduleWithFixedDelay(() -> { try { loadRemoteAdapterConfigs(); } catch (Throwable e) { logger.error("scan remote adapter configs failed", e); } }, 10, 3, TimeUnit.SECONDS); }
loadRemoteConfig
和loadRemoteAdapterConfigs
的具体代码逻辑也比较简单。loadRemoteConfig
通过下列sql去表canal_config找一条id为2的特定记录,这条记录的content就是canal-adapter的主配置文件
application.yml
select name, content, modified_time from canal_config where id=2
loadRemoteAdapterConfigs
adapter的相关配置是记录在canal_adapter_config表里的,这张表每一条记录就代表一个adapter配置文件
因为记录条数可能会比较多,所以这里的查询略有优化。首先会通过一个简单查询(不包括content配置文件字段),然后根据结果和本地的配置文件做modified_time的比对,如果发现有变更再会去mysql拉取有变更的记录的content
select id, category, name, modified_time from canal_adapter_config select id, category, name, content, modified_time from canal_adapter_config where id in ()
上面从远程下载的两种配置文件非常的重要,这里分别介绍一下:
application.yml
主配置文件主要用来配置:
- 消费模式(直连canal还是通过消息队列)
- 订阅关系(topic、消费组、消费方式(或者叫异构模式))
- 数据源定义(主要是消费消息时用来做一些数据查询转换以及etl时的数据源)
adapter.yml
adapter相关的配置文件。主要用来定义数据的清洗逻辑、转换逻辑
还有一点需要注意,该类是通过下面这个配置文件注入的
org.springframework.cloud.bootstrap.BootstrapConfiguration=\\ com.alibaba.otter.canal.adapter.launcher.config.BootstrapConfiguration
这里用了比较取巧的方式,为了让这个类更早的被初始化,然后去加载配置。
并且,还要把应用里的
application.yml
等配置文件都删掉才能生效,不然会直接追加到本地配置文件application.yml
下图是从官方repo里介绍“从远程加载配置”的用法:
ApplicationConfigMonitor
这个类如它的名字一样,主要是用来监听
application.yml
配置文件变更的@PostConstruct public void init() { File confDir = Util.getConfDirPath(); try { FileAlterationObserver observer = new FileAlterationObserver(confDir, FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.prefixFileFilter("application"), FileFilterUtils.suffixFileFilter("yml"))); FileListener listener = new FileListener(); observer.addListener(listener); fileMonitor = new FileAlterationMonitor(3000, observer); fileMonitor.start(); } catch (Exception e) { logger.error(e.getMessage(), e); } }
是不是感觉这个和上面的
RemoteConfigLoader.startMonitor()
有一点相似和重复?
其实并没有,它俩承担了不同的职能:RemoteConfigLoader.startMonitor()
是负责把mysql里的配置下载到本地磁盘上
ApplicationConfigMonitor
是负责监听本地磁盘上文件的变化,进而触发应用内的相关对象的变化
另外,这个类只监听了
application.yml
,adapter的配置文件监听会在后面看到CanalAdapterService
这可能是本篇最复杂的一个启动类讲解了。以下的代码片段都做过一些简化处理,只保留了和流程相关的部分,方便大家理解。
// com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService#init @PostConstruct public synchronized void init() { if (running) { return; } adapterLoader = new CanalAdapterLoader(adapterCanalConfig); // 初始化CanalAdapterLoader adapterLoader.init(); running = true; } // com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader#init public void init() { loader = ExtensionLoader.getExtensionLoader(OuterAdapter.class); // 根据application.yml里的配置来创建AdapterProcessor for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) { for (CanalClientConfig.Group group : canalAdapter.getGroups()) { List<List<OuterAdapter>> canalOuterAdapterGroups = new CopyOnWriteArrayList<>(); List<OuterAdapter> canalOuterAdapters = new CopyOnWriteArrayList<>(); // 对于每个配置的outerAdapter都会去load对应的Adapter,并加入到canalOuterAdapters for (OuterAdapterConfig config : group.getOuterAdapters()) { loadAdapter(config, canalOuterAdapters); } // 这个list有点匪夷所思,因为不管怎么样,这个list的长度永远都是1... canalOuterAdapterGroups.add(canalOuterAdapters); // AdapterProcessor 是个核心类,后面我们分析 AdapterProcessor adapterProcessor = canalAdapterProcessors.computeIfAbsent(canalAdapter.getInstance() + "|" + StringUtils.trimToEmpty(group.getGroupId()), f -> new AdapterProcessor(canalClientConfig, canalAdapter.getInstance(), group.getGroupId(), canalOuterAdapterGroups)); // 启动 AdapterProcessor adapterProcessor.start(); + group.getGroupId()); } } } private void loadAdapter(OuterAdapterConfig config, List<OuterAdapter> canalOutConnectors) { // 简化后的代码 OuterAdapter adapter = loader.getExtension(config.getName(), StringUtils.trimToEmpty(config.getKey())); adapter.init(config, evnProperties); canalOutConnectors.add(adapter); }
由于我们这里主要关注的是启动流程,会选择性忽略一些类加载、SPI相关代码的分析。
上述代码间写了一些注释,可以辅助理解,这里再做一些说明:
- 首先,因为核心逻辑主要围绕application.yml这个配置文件,所以你需要对这个配置文件的结构有一定的了解,主要了解canalAdapters这个配置项 参考链接
- 看完配置项之后我再大概解释一下,其实canalAdapters有点像配置MQ的消费组一样,对于每个canalAdapter,你需要配置
- 1个需要监听的topic
- 1-n个消费组标识groupId
- 每个消费组内可以配置1-n个串行消费动作outerAdapters
- 同一个topic可以配置多个消费组,每个消费组可以配置多个消费动作
- 每一个消费组会生成一个AdapterProcessor,会调用每个AdapterProcessor的start方法
AdapterProcessor
下面分析核心类AdapterProcessor,先看构造函数
public AdapterProcessor(CanalClientConfig canalClientConfig, String destination, String groupId, List<List<OuterAdapter>> canalOuterAdapters){ // application.yml配置 this.canalClientConfig = canalClientConfig; // 监听topic this.canalDestination = destination; // 消费组标识 this.groupId = groupId; // 消费动作 this.canalOuterAdapters = canalOuterAdapters; // 永远都是size为1的线程池 this.groupInnerExecutorService = Util.newFixedThreadPool(canalOuterAdapters.size(), 5000L); syncSwitch = (SyncSwitch) SpringContext.getBean(SyncSwitch.class); // 加载canalMsgConsumer,这个可以理解是MQ的代理对象 ExtensionLoader<CanalMsgConsumer> loader = new ExtensionLoader<>(CanalMsgConsumer.class); canalMsgConsumer = loader .getExtension(canalClientConfig.getMode().toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR); Properties properties = canalClientConfig.getConsumerProperties(); properties.put(CanalConstants.CANAL_MQ_FLAT_MESSAGE, canalClientConfig.getFlatMessage()); properties.put(CanalConstants.CANAL_ALIYUN_ACCESS_KEY, canalClientConfig.getAccessKey()); properties.put(CanalConstants.CANAL_ALIYUN_SECRET_KEY, canalClientConfig.getSecretKey()); ClassLoader cl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(canalMsgConsumer.getClass().getClassLoader()); // 初始化 CanalMsgConsumer canalMsgConsumer.init(properties, canalDestination, groupId); Thread.currentThread().setContextClassLoader(cl); }
既然看到CanalMsgConsumer了,我们简单看下它的接口定义,会更容易理解:
public interface CanalMsgConsumer { void init(Properties properties, String topic, String groupId); void connect(); List<CommonMessage> getMessage(Long timeout, TimeUnit unit); void ack(); void rollback(); void disconnect(); }
看着这几个接口定义,应该大致都能猜到消费的流程了,不急,我们还是耐着性子看代码:
public void start() { if (!running) { thread = new Thread(this::process); thread.setUncaughtExceptionHandler(handler); thread.start(); running = true; } }
AdapterProcessor的start方法,创建并启动了一个线程,核心逻辑
process
:private void process() { // 这里为什么需要2个while 不是很懂 while (!running) { // waiting until running == true while (!running) { try { Thread.sleep(1000); } catch (InterruptedException e) { } } } // 消息重试次数 int retry = canalClientConfig.getRetries() == null || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries(); if (retry == -1) { retry = Integer.MAX_VALUE; } while (running) { try { // 连接MQ,理论上应该是订阅topic,一会看具体代码 canalMsgConsumer.connect(); while (running) { if (!running) { break; } for (int i = 0; i < retry; i++) { if (!running) { break; } try { // pull消息 List<CommonMessage> commonMessages = canalMsgConsumer .getMessage(this.canalClientConfig.getTimeout(), TimeUnit.MILLISECONDS); // 如果超时没有pull到消息,理论上这里最好加个非空判断 // 消费逻辑 writeOut(commonMessages); // 消费成功之后ack canalMsgConsumer.ack(); break; } catch (Exception e) { // 如果消费重试retry次依旧报错,那么就直接ack掉了(注意:这样会丢消息) if (i != retry - 1) { canalMsgConsumer.rollback(); } else { canalMsgConsumer.ack(); } Thread.sleep(500); } } } // 断开与MQ的连接 canalMsgConsumer.disconnect(); } catch (Throwable e) { logger.error("process error!", e); } if (running) { // is reconnect try { Thread.sleep(1000); } catch (InterruptedException e) { // ignore } } } }
线程里的逻辑很简单:
- 向mq订阅对应的topic
- 去mq里拉取消息
- 处理消息
- 向mq ack消息
- 重复第2步
对应CanalMsgConsumer的具体实现我们就不看了,这里重点再看下处理消息的逻辑(writeOut):
public void writeOut(final List<CommonMessage> commonMessages) { List<Future<Boolean>> futures = new ArrayList<>(); // 这个canalOuterAdapters size永远都是1,其实不需要foreach canalOuterAdapters.forEach(outerAdapters -> { futures.add(groupInnerExecutorService.submit(() -> { try { // 组内适配器穿行运行,尽量不要配置组内适配器 outerAdapters.forEach(adapter -> { List<Dml> dmls = MessageUtil.flatMessage2Dml(canalDestination, groupId, commonMessages); batchSync(dmls, adapter); }); return true; } catch (Exception e) { logger.error(e.getMessage(), e); return false; } })); RuntimeException exception = null; for (Future<Boolean> future : futures) { try { if (!future.get()) { exception = new RuntimeException("Outer adapter sync failed! "); } } catch (Exception e) { exception = new RuntimeException(e); } } if (exception != null) { throw exception; } }); }
如果你仔细看到了这里,我想你应该能发现,上面这段代码其实不用这么复杂,因为canalOuterAdapters的长度永远都是1,不存在并行的场景,直接串行执行组内适配器即可:
public void writeOut(final List<CommonMessage> commonMessages) { canalOuterAdapters.get(0).forEach(adapter -> { // 消息类型转换 List<Dml> dmls = MessageUtil.flatMessage2Dml(canalDestination, groupId, commonMessages); batchSync(dmls, adapter); }); }
再看看batchSync的代码:
private void batchSync(List<Dml> dmls, OuterAdapter adapter) { // 分批同步 if (dmls.size() <= canalClientConfig.getSyncBatchSize()) { adapter.sync(dmls); } else { int len = 0; List<Dml> dmlsBatch = new ArrayList<>(); for (Dml dml : dmls) { dmlsBatch.add(dml); if (dml.getData() == null || dml.getData().isEmpty()) { len += 1; } else { len += dml.getData().size(); } if (len >= canalClientConfig.getSyncBatchSize()) { adapter.sync(dmlsBatch); dmlsBatch.clear(); len = 0; } } if (!dmlsBatch.isEmpty()) { adapter.sync(dmlsBatch); } } }
支持了分批同步,上面的分支逻辑还算清晰:
- 如果总消息数都没达到1个批次的数量,那么直接同步
- 如果总消息数超过了1个批次的数量,那么就1批批的做同步
- 如果最终还剩余不到1批次的数量,那么再同步一次
好了,核心的消息逻辑就分析完了,adapter.sync跟具体的实现挂钩了
设计到具体的实现逻辑,这块我们后面再单独写。
总结
本篇文章重点分析了canal-adapter的启动流程。并且推荐以远程配置的方式运行canal-adapter。在这里再总结一下整个启动流程:
- 连接mysql加载主配置文件和adapter相关配置文件
- 启动定时器轮询配置文件,发现有变化则下载到本地
- 启动定时器监听本地配置文件,发现有变化则触发应用读取文件并reload相应组件
- 针对于每一个topic / instance,可配置多个消费组,每个消费组可以配置一组消费单元,组内消费单元串行执行
- 每个消费组只有一个线程去拉取消息,但是可以通过多实例来控制线程拉取的topic的队列数来达到水平扩展的效果
- 作者:黑微狗
- 链接:https://blog.hwgzhu.com/article/canal-client-adapter-core-code-bootstrap
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。
相关文章