URL
date
AI summary
slug
status
tags
summary
type

引言

作为开发人员,我们经常遇到需要追踪数据更新轨迹的需求。本文旨在探索离线binlog文件的解析方法,帮助我们快速找到数据的变更历史。
在解析binlog文件时,我们通常有两种模式:
  • 在线解析:直接连接MySQL服务器解析binlog文件。
  • 离线解析:处理存储在本地的binlog文件。
本文将主要讨论后者—离线解析模式。
💡
之前写过一篇文章介绍了binlog events的结构,强烈推荐还没阅读过的同学先去看看,对于你理解本篇文章肯定大有裨益。

工具调研

在工具选择上,我们调研了多种解决方案,包括阿里云DMS、mysqlbinlog、binlog2sql、bingo2sql等,评估了它们的优缺点。

阿里云DMS数据追踪

阿里云的dms提供了一个叫做数据追踪的功能,但是"追踪区间最大为48小时"

优点

用户界面友好,产品功能集成

缺点

追踪区间限制为48小时,不适合长期数据追踪。

mysqlbinlog

这个是官方自带的binlog解析工具。

优点

官方出品,兼容性最好,bug相对最少。

缺点

  1. 没有列名的补全,只有列对应的位置信息
  1. 过滤能力很有限,比如不支持表名过滤,sql类型过滤
没有列名补全的日志可读性是比较差的,我贴出一小段便于你感受一下
#231129 10:00:03 server id 1145804703 end_log_pos 13184 CRC32 0x410c1d23 Table_map: `school`.`bureau_statistics` mapped to number 159 # Columns(BIGINT NOT NULL, # BIGINT NOT NULL, # BIGINT NOT NULL, # BIGINT, # INT, # INT, # INT, # INT, # INT, # INT, # DECIMAL(16,2), # DATETIME NOT NULL, # DATETIME NOT NULL) # at 13184 #231129 10:00:03 server id 1145804703 end_log_pos 13372 CRC32 0x70f00704 Update_rows_v1: table id 159 flags: STMT_END_F ### UPDATE `school`.`bureau_statistics` ### WHERE ### @1=430 ### @2=198 ### @3=164325514417812 ### @4=2260 ### @5=43 ### @6=42404 ### @7=2216 ### @8=940 ### @9=35 ### @10=0 ### @11=11695475.00 ### @12='2023-09-12 22:17:13' ### @13='2023-11-29 09:55:00' ### SET ### @1=430 ### @2=198 ### @3=164325514417812 ### @4=2260 ### @5=43 ### @6=42404 ### @7=2216 ### @8=940 ### @9=35 ### @10=0 ### @11=11695475.00 ### @12='2023-09-12 22:17:13' ### @13='2023-11-29 10:00:00' # at 13372
不过这就是binlog最原始的样子,binlog本身只记录了列的位置信息。因为这对于数据复制和数据恢复来说已经足够了,保存的信息越少,对于存储和传输效率来说肯定越好。
那么我们是否能把列位置还原成列名呢?答案是肯定的,我们只要知道当时的表结构就可以了。那么如何得到“当时”的表结构呢?一个简单的小技巧,可以使用“当前”的表结构。是的,你没有听错,用最简单的方式解决了大多数的场景,这也不失为一种“良好”的解决方案。而如果要严谨的获取“当时”的表结构,肯定需要快照信息,这对于一个binlog文件解析工具来说就有点超出范畴了。
后面的三种开源项目都支持列还原,并且也都是用“当前”的表结构来实现的。不过“当前”表结构的获取也分在线和离线两种方式。
💡
这里我们再定义两种获取表结构的模式,也分为在线和离线两种模式,后文你需要结合上下文来判断指代的是binlog解析模式还是表结构获取模式 - 在线模式:指的是直接连接到mysql获取实时表结构 - 离线模式:指的是通过一份DDL文件来获取表结构

binlog2sql

binlog2sql就是直接使用“当前”表结构来补全列信息的一个典型工具,不过它只支持在线解析+在线表结构获取。而对于在线解析来说,使用“当前”表结构听起来似乎就更加“合理”了。不过binlog2sql已经5年没更新了,社区不活跃。初步测试了下,master代码不支持mysql 8.0.25,需要更新requirements.txt升级一下依赖(https://github.com/danfengcao/binlog2sql/issues/66)
PyMySQL==0.9.3 wheel==0.29.0 mysql-replication==0.21

优点

  1. 支持库表维度过滤
  1. 支持sql类型过滤
  1. 支持生成回滚SQL

缺点

  1. 不支持离线解析
  1. 性能较差
  1. 社区不活跃

bingo2sql

bingo2sql可以看作是binlog2sql的升级版,它继承了binlog2sql的所有优点,而对于binlog2sql的缺点它也做了补足,比如它增加支持了离线模式,也优化了性能。不过在测试离线模式的时候,发现对于本地表结构DDL不支持collation utf8mb4_0900_ai_ci
ERRO[2023-12-04T17:14:26+08:00] 读取表结构文件失败(ddl.sql): 解析失败: [ddl:1273]Unknown collation: 'utf8mb4_0900_ai_ci' file=bingo2sql.go line=182
删掉之后就可以了。不过DDL里没有指定库名,只指定了表名,不知道对于不同库名同表名的场景是如何支持的(也没有具体去测)。

优点

几乎涵盖了所有离线解析binlog的基本功能

缺点

  1. 离线表结构读取存在一些小问题
  1. 不支持多binlog文件的顺序解析,缺少对云rds的支持
整体来看,其实bingo2sql已经可以满足我本次的目标了。不过,我并没有止步于此,上面这2款Github上的开源解析工具一个是python写的,一个是go写的,竟然没有Java写的。我决定填补一下这个盲区,也扩展一些我想要的新功能!

my2sql

优点

支持一些分析统计的功能,比如能统计大事务长事务等

缺点

要自己编译go源代码运行

canal2sql

首先声明,这不是官方产品,是我自研的一个小工具。

设计初衷

主要是为了提供一个更全面的离线binlog解析工具,并填补上Java在这一块的空白,底层还是依赖canal,扩展了它的离线解析能力。
对Java开发来说,canal应该是很熟悉了。canal以在线解析binlog闻名,比如mysql->es的异构数据同步就是最典型的场景。而对于解析离线binlog文件,似乎没有过多的介绍,甚至在Github的wiki上貌似都没找到,在源码中也只是一个很简单的测试用例的存在。
看过它的源码之后,你会发现它的流程上大多数是复用了在线解析binlog的逻辑,这可能会导致在某些地方的设计以及扩展性不够好。不过既然官方已经实现了离线binlog文件的解析,那我们就从官方实现入手来实现我的需求。考虑到这个模块相对独立一些,于是把它移出来单独成为一个项目——canal2sql。
💡
关于官方给出的解析本地binlog的测试用例,在特定场景下存在bug,强烈建议你看看我之前写的相关文章,其中我提供了几种不同的解决思路,这有助于你更好的理解本篇文章的内容。

设计目标

既然要自研,那应该可以实现除了造火箭之外的任何功能。看看我们到底想要什么:
  1. 支持binlog的在线解析、离线解析
  1. 列还原同时支持在线模式和离线模式
  1. 支持生成回滚SQL
  1. 支持解析范围控制
    1. 时间维度
    2. position维度
  1. 支持对象过滤
    1. 库表维度
    2. sql类型维度
  1. 支持多binlog文件顺序解析
  1. 支持云rds binlog文件列表解析
  1. 支持分析统计功能
    1. 耗时事务统计
    2. 大事务统计
    3. 表维度/库维度 更新数据量统计
    4. qps统计
我的天,终于理解了为什么产品同学这么能提需求,当你成为一个用户或者产品设计者的时候,你真的能提出很多很多的点子,有些是从现实产品里汲取的点子,有些是自己作为用户最真实的需求。(截止文章发表,除了第8点统计分析其余功能全部实现)

核心改造点

前面提到了,canal的离线解析其实和在线解析复用了同一套代码,整体流程可以抽象为:
  1. 启动dump线程
  1. 连接底层数据源
  1. dump binlog
  1. 解析binlog
  1. 消费binlog
其中,两者最主要的不同就在第二步和第三步(通过模版模式来扩展):
  • 在线解析是连接mysql,而离线解析则是连接对应的binlog文件,
  • 在线解析是从mysql的实时binlog读取数据,而离线解析是从binlog文件路径读取数据
不过对于复用来说,很容易出现的一个问题就是可能会碰到一些“别扭”的地方。

解析范围控制逻辑收敛

首先,canal不支持end_position和end_time的过滤。其次,官方的离线解析对于start_time控制的代码逻辑实现得也有点奇怪和不优雅,这里我就不贴代码了,具体在com.alibaba.otter.canal.parse.inbound.mysql.LocalBinLogConnection#dump(long, com.alibaba.otter.canal.parse.inbound.MultiStageCoprocessor)有兴趣的同学可以去看看。
因为canal设计之初就是为了实时监听binlog,所以完全没有考虑终止条件,如果单纯的设计一个离线解析的产品,怎么可能不支持这个功能呢
我把控制解析范围的逻辑通过增加一个专门的类收敛掉了。除了在线解析场景下的start_position是mysql原生支持的参数,其余模式我们都需要遍历binlog文件去过滤:
public class LogEventFilter { public LogEvent filter(LogEvent event) { if (event == null) { return null; } String logFileName = event.getHeader().getLogFileName(); if (startTime != null && event.getWhen() < startTime.getTime() / 1000) { return null; } if (endTime != null && event.getWhen() > endTime.getTime() / 1000) { shutdownNow(); return null; } // binlog 文件需要从头遍历,而online模式可以直接从指定位置读 if (startFile == null) { if (startPosition != null && event.getLogPos() < startPosition) { return null; } } else { if (startFile.equals(logFileName) && startPosition != null && event.getLogPos() < startPosition) { return null; } } if (endFile == null) { if (endPosition != null && event.getLogPos() > endPosition) { shutdownNow(); return null; } } else { if (endFile.equals(logFileName) && endPosition != null && event.getLogPos() > endPosition) { shutdownNow(); return null; } } return event; } private void shutdownNow() { System.exit(1); } }

终止条件支持

另外,对于end_position和end_time来说,不仅是过滤条件,也是终止条件。而在复用的代码框架下,我们没有办法很优雅的停止程序,只能通过System.exit(1)来终止进程(上述代码里的shutdownNow)。而这个filter代码块的调用位置也很有讲究,因为canal在并行模式下,是有三个消费阶段的:SimpleParserStage -> DmlParserStage -> SinkStoreStage,对应也是三组消费线程。我们必须加在第三个阶段SinkStoreStage,不然日志输出有可能丢失。
消费逻辑都在MultiStageCoprocessor里,所以,改它:
public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implements MultiStageCoprocessor { private class SinkStoreStage implements EventHandler<MessageEvent>, LifecycleAware { public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) throws Exception { try { LogEvent logEvent = event.getEvent(); if (event.getEntry() != null) { if (logEventFilter == null || logEventFilter.filter(logEvent) != null) { transactionBuffer.add(event.getEntry()); } } if (connection instanceof MysqlConnection && logEvent.getSemival() == 1) { // semi ack回报 ((MysqlConnection) connection).sendSemiAck(logEvent.getHeader().getLogFileName(), logEvent.getHeader().getLogPos()); } // clear for gc event.setBuffer(null); event.setEvent(null); event.setTable(null); event.setEntry(null); event.setNeedDmlParse(false); } catch (Throwable e) { exception = new CanalParseException(e); throw exception; } } } }

增加支持通过本地DDL文件还原列

官方的离线解析是通过指定数据库实时获取表结构来还原列信息的。而我们需要扩展本地DDL文件的模式。得益于canal对于表结构元数据的良好设计(不了解的同学可以参考https://github.com/alibaba/canal/wiki/TableMetaTSDB)。我只是在原有基础上做了一些微调,支持了读取本地文件来构建TableMetaCache
public class TableMetaCache { public TableMetaCache(String fileName){ String createDDLs; try { createDDLs = FileUtils.readFileToString(new File(fileName)); } catch (IOException e) { throw new RuntimeException(e); } MemoryTableMeta memoryTableMeta = new MemoryTableMeta(); String[] createDDLList = createDDLs.split(";"); for (String createDDL : createDDLList) { memoryTableMeta.apply(DatabaseTableMeta.INIT_POSITION, null, createDDL, null); } this.tableMetaDB = CacheBuilder.newBuilder().build(new CacheLoader<String, TableMeta>() { @Override public TableMeta load(String name) throws Exception { String[] names = StringUtils.split(name, "`.`"); String schema = names[0]; String table = names[1]; return memoryTableMeta.find(schema, table); } }); } }
在Parser的preDump阶段构建好TableMetaCache,如果指定了本地DDL文件,优先使用
@Override protected void preDump(ErosaConnection connection) { if (StringUtils.isNotBlank(ddlFile)) { tableMetaCache = new TableMetaCache(ddlFile); } else { metaConnection = buildMysqlConnection(); try { metaConnection.connect(); } catch (IOException e) { throw new CanalParseException(e); } if (tableMetaTSDB != null && tableMetaTSDB instanceof DatabaseTableMeta) { ((DatabaseTableMeta) tableMetaTSDB).setFilter(eventFilter); ((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter); ((DatabaseTableMeta) tableMetaTSDB).setSnapshotInterval(tsdbSnapshotInterval); ((DatabaseTableMeta) tableMetaTSDB).setSnapshotExpire(tsdbSnapshotExpire); ((DatabaseTableMeta) tableMetaTSDB).init(destination); } tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB); } ((LogEventConvert) binlogParser).setTableMetaCache(tableMetaCache); }

支持多协议离线binlog文件的读取

原生canal的离线解析只支持本地文件协议,而我希望读取远程的binlog文件(为后续读取阿里云rds的binlog文件做准备)。两者只是协议不一样,是不是可以用URL.openConnection来解决这个问题?
public class BinlogFileConnection implements ErosaConnection { public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocessor coprocessor) throws IOException { try (BinlogFileLogFetcher fetcher = new BinlogFileLogFetcher(bufferSize)) { LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT); LogContext context = new LogContext(); fetcher.open(binlogFile, binlogPosition); context.setLogPosition(new LogPosition(binlogfilename, binlogPosition)); LogEvent event = null; while (fetcher.fetch()) { event = decoder.decode(fetcher, context); if (event == null) { continue; } if (!coprocessor.publish(event)) { break; } } } } }
这里的核心是BinlogFileLogFetcher,我开始的时候是原模原样照搬了FileLogFetcher逻辑:只修改了open方法的逻辑,不是去打开本地文件,而是通过URL去打开一个连接:
public final class BinlogFileLogFetcher extends LogFetcher { public void open(String urlStr, final long filePosition) throws FileNotFoundException, IOException { URL url = new URL(urlStr); URLConnection urlConnection = url.openConnection(); in = urlConnection.getInputStream(); // 省略后续代码 } }
没想到运行的时候报错了:
16:07:14.135 [destination = null , address = rm-bp1yjb2rh8249f1ft90130.mysql.rds.aliyuncs.com/192.168.111.235:3306 , EventParser] ERROR c.g.z.c.p.RemoteBinlogEventWithLocalDDLParser - dump address rm-bp1yjb2rh8249f1ft90130.mysql.rds.aliyuncs.com/192.168.111.235:3306 has an error, retrying. caused by java.lang.IllegalArgumentException: limit excceed: 3201 at com.taobao.tddl.dbsync.binlog.LogBuffer.getUint32(LogBuffer.java:532) ~[canal.parse.dbsync-1.1.5.jar:na] at com.taobao.tddl.dbsync.binlog.event.LogHeader.processCheckSum(LogHeader.java:299) ~[canal.parse.dbsync-1.1.5.jar:na] at com.taobao.tddl.dbsync.binlog.event.LogHeader.<init>(LogHeader.java:228) ~[canal.parse.dbsync-1.1.5.jar:na] at com.taobao.tddl.dbsync.binlog.LogDecoder.decode(LogDecoder.java:100) ~[canal.parse.dbsync-1.1.5.jar:na] at com.github.zhuchao941.canal2sql.RemoteBinLogConnection.dump(RemoteBinLogConnection.java:94) ~[classes/:na] at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$1.run(AbstractEventParser.java:262) ~[canal.parse-1.1.5.jar:na] at java.lang.Thread.run(Thread.java:750) [na:1.8.0_333]
每次报错的异常信息里limit excceed后面的数值还不一样。并且同样的文件,我用原生的离线解析(FileInputStream)去读就没问题,那么可以排除文件本身的问题。那究竟是什么问题呢?只能更深入的去看源代码了

解决官方离线解析的Bug

我们找到报错链路上的源代码
public class LogBuffer { public final long getUint32(final int pos) { final int position = origin + pos; if (pos + 3 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 3))); byte[] buf = buffer; return ((long) (0xff & buf[position])) | ((long) (0xff & buf[position + 1]) << 8) | ((long) (0xff & buf[position + 2]) << 16) | ((long) (0xff & buf[position + 3]) << 24); } } public final class LogHeader { private void processCheckSum(LogBuffer buffer) { if (checksumAlg != LogEvent.BINLOG_CHECKSUM_ALG_OFF && checksumAlg != LogEvent.BINLOG_CHECKSUM_ALG_UNDEF) { crc = buffer.getUint32(eventLen - LogEvent.BINLOG_CHECKSUM_LEN); } } }
看异常堆栈信息应该是在LogDecoder解码header的时候,有一个步骤叫做processCheckSum,在这一步报错了。不过光看这两个方法,要把这个问题搞清楚还是挺有难度的。因为这一块都是解码binlog event协议的逻辑。我花了一些时间去了解了binlog event协议的格式、mysql协议的格式以及LogBuffer的处理逻辑,才找出了这个问题真正的原因,并给出了一个可以根本上解决问题的方案。
我先说一下结论:LogDecoder和FileLogFetcher各自都没有保证binlog event的完整性,导致LogDecoder在解码不完整的binlog event时(header是完整的但整个event不完整),会溢出。
如果看得不是很明白,我再详细解释一下:
  1. LogDecoder在解码binlog event时,会根据mysql的配置来决定要不要去获取binlog event的校验和CRC(不过貌似只有读取,没有看到用来做校验)
  1. 解码是分步进行的,先解码header,再通过header里的event_length解码body
  1. 所以对于数据流大小的判断也是分步的,满足header_length了,就可以先解码header
  1. 获取校验和的逻辑在解码header的过程中
  1. 而校验和在数据流是位于整个binlog event的最后4个字节
所以,如果在解码header的时候,buffer里还没有读入完整的binlog event,那么去获取校验和的时候就会发现要读取的位置溢出了,也就是上面的报错,limit excceed了。
刚刚说到的是LogDecoder,它主要负责解码buffer里的数据流。那么buffer里的数据是谁填充进来的呢?答案是LogFetcher。它们俩协同处理整个binlog event数据流。
那为什么在线解析和官方离线解析的时候没有这个问题呢? 在线解析是因为上层LogFetcher的实现类DirectLogFetcher,保证了binlog event的完整性。也就是说,DirectLogFetcher在往buffer里填充数据的时候,每次都会保证填充一个完整的event。这个是通过mysql协议的Packet来保证的。1个binlog event肯定被包在一个完整的Packet链里,上层DirectLogFetcher保证读完整个Packet
而官方离线解析“没问题”,并非真正的没问题。没暴露问题主要是因为buffer够大,并且由于是本地文件,LogFetcher每次fetch时调用inpustream.read(buffer, 0, buffer.length)方法都能读到buffer.length的数据长度(我不确定有没有这样的保障,至少从InputStream的javadoc看是不保证的,但是测下来是每次都如此)所以,只是“凑巧”没问题。你可以试试把buffer size调小,比如调到20,这个问题肯定是必现的。
而在通过http协议跨公网读取的情况下,使用inpustream.read(buffer, 0, buffer.length)方法,更容易导致每次读取到的数据存在不确定性(和你的网络、对端网络情况、中间传输路径都有关系)。此时出现上述limit excceed的可能性也就更大。
说完LogFetcher,我们再回过头来看看LogDecoder。如果说解码header一定要依赖校验和,那么就必须要求一个完整的event,这一层的设计里必须要能处理这种异常情况,而不能完全依赖上层。如果buffer里的可读数据不满足要求,那就交给上层LogFetcher继续读取。原来的流程里也有这样的设计,不过只考虑了buffer里的可读数据小于header size的场景。
既然上面的问题是LogFetcher和LogDecoder共同带来的,那解决的思路也有2个:
  1. 自定义一个LogFetcher在上层保证event的完整性
  1. 自定义LogDecoder,在下层发现buffer里的event不完整,那就交由上层继续填充
这两种方案随便哪一种实现了都可以解决问题,不过我采用了1、2都改的方式,保证它在每一层都是安全的。
自定义FileLogFetcher
public final class BinlogFileLogFetcher extends LogFetcher { public boolean fetch() throws IOException { if (in == null) { return false; } // 先判断当前buffer里的数据够不够一次事件 if (limit < FormatDescriptionLogEvent.LOG_EVENT_HEADER_LEN) { // 如果不到一个头就先读一个头 int length = readFully(in, buffer, origin + limit, FormatDescriptionLogEvent.LOG_EVENT_HEADER_LEN); if (length < 0) { return false; } limit += length; } long eventLen = getUint32(LogEvent.EVENT_LEN_OFFSET); if (limit >= eventLen) { // 足够一次完整事件,无需从底层读取数据 return true; } // 先确保buffer足够 ensureCapacity((int) eventLen); // 从底层读满一个事件 int length = readFully(in, buffer, origin + limit, (int) (eventLen - FormatDescriptionLogEvent.LOG_EVENT_HEADER_LEN)); if (length < 0) { return false; } limit += length; return true; } }
自定义LogDecoder
只简单的调整了几行代码,核心逻辑就是不分步解码header和body,只读取event_length,在确保event_length都在buffer里的时候,才同时解码header和body
notion image

aliyun模式

前面优先完善了单个binlog文件的解析逻辑,接下来主要解决多binlog文件的自动解析了(其实前面都是在为这个终极功能做铺垫)。因为现在大多都是用的云rds,我们用的阿里云,binlog是按照时间/大小维度滚动的,并且会自动备份到oss。所以如果想要找到某条数据的更新轨迹,很可能是要跨很多个binlog文件的,这个如果每次手动处理的话,人肉成本很高,所以必须产品化。流程:调用阿里云RDS的openapi根据时间范围查询binlog文件列表,按时间依次读取对应文件并解析。
public class AliyunBinlogFileConnection implements ErosaConnection { public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocessor coprocessor) throws IOException { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ"); String[] split = instanceId.split("\\\\|"); DescribeBinlogFilesResponse describeBinlogFilesResponse = AliyunClient.describeBinlogFiles(new DescribeBinlogFilesRequest().setDBInstanceId(split[0]).setStartTime(format.format(startTime)).setEndTime(format.format(endTime)), ak, sk); List<DescribeBinlogFilesResponseBody.DescribeBinlogFilesResponseBodyItemsBinLogFile> binLogFiles = describeBinlogFilesResponse.getBody().getItems().getBinLogFile(); if (split.length > 1) { binLogFiles = binLogFiles.stream().filter(binLogFile -> binLogFile.getHostInstanceID().equals(split[1])).collect(Collectors.toList()); } System.out.println("find binLogFile num:" + binLogFiles.size()); binLogFiles.sort(Comparator.comparing(o -> o.logBeginTime)); for (DescribeBinlogFilesResponseBody.DescribeBinlogFilesResponseBodyItemsBinLogFile binlogFile : binLogFiles) { System.out.println(String.format("fileName:%s, fileSize:%s, time:%s-%s", binlogFile.getLogFileName(), binlogFile.getFileSize(), binlogFile.getLogBeginTime(), binlogFile.getLogEndTime())); } for (DescribeBinlogFilesResponseBody.DescribeBinlogFilesResponseBodyItemsBinLogFile logFile : binLogFiles) { try (BinlogFileLogFetcher fetcher = new BinlogFileLogFetcher(bufferSize)) { LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT); LogContext context = new LogContext(); String downloadLink = internal ? logFile.intranetDownloadLink : logFile.downloadLink; fetcher.open(downloadLink, binlogPosition); System.out.println(String.format("# download file %s from %s", logFile.logFileName, downloadLink)); context.setLogPosition(new LogPosition(binlogfilename, binlogPosition)); LogEvent event = null; while (fetcher.fetch()) { event = decoder.decode(fetcher, context); if (event == null) { continue; } if (!coprocessor.publish(event)) { break; } } } } } }
由于oss上的binlog如果走公网下载的话要收费,所以我们支持了两种模式:公网和内网。内网不收费且速度更快,也是默认的模式。

总结

问题解决的差不多了,产品也做完了,整个过程还是碰到了不少的问题的。但是也实现了我想要的效果。以后再有人问“这条记录的更新轨迹”,我想应该能很快回复他了。甚至我们也可以产品化一个“数据追踪”的功能,但是不限制48小时的时长。

异常场景记录

因为对于列补全用的都是“实时”的表结构,所以假设中间改过表结构的话,可能会出现异常情况,这里总结一下:
  1. 在表的最后一列之后增加列,不影响
  1. 在其他位置增加列,会导致之前的binlog解析错乱
  1. 删除最后一列,不影响
  1. 删除其他列,会导致之前的binlog解析错乱
总结起来就是,只要列的位置发生了变化,就可能会影响到解析

参考

  1. MySQL-binlog解析工具
  1. 数据同步工具之FlinkCDC/Canal/Debezium对比
  1. MySql-Binlog 协议详解 - 报文篇
  1. 带你玩转 MySql - 协议篇
  1. 解读 MySQL Client/Server Protocol: Connection & Replication
  1. 关于binary log那些事——认真码了好长一篇
  1. MySQL的半同步是什么?
  1. MySQL Binlog 源码入门
  1. MySQL如何传输二进制日志
  1. 美团DB数据同步到数据仓库的架构与实践
Guava RateLimiter定制spring-cloud-gateway限流器来解决线上问题