URL
date
AI summary
slug
status
tags
summary
type
背景
采用批量化的操作来提升性能是一个很常规的思想,一般用于round trip开销比较大的场景。比如调用远程服务,操作数据库,发送MQ消息等等。其实除了减少了round trip的开销之外,批量化操作还提高了请求的利用率,因为每个请求一般都会携带一些”头“数据,比如tcp协议头、http协议头等等,通过批量操作这些头可能就从1000个减少到了1个。
今天我们主要聊聊数据库层面支持哪些批量操作以及它们的特点。
multi-value insert
这是我们最常使用的数据库批量操作的场景。之前还写过两篇文章介绍mybatis的批量插入引起的内存溢出以及解决方案,感兴趣的同学可以点击对应链接阅读。
这里的批量插入语句我们都是使用
mybatis-generator
配合com.itfsw:mybatis-generator-plugins
插件生成出来的。生成出来的内容大致如下,支持全字段插入和selective
插入:<insert id="batchInsert" keyColumn="id" keyProperty="id" parameterType="map" useGeneratedKeys="true"> insert into `file_task` (task_name, task_type, task_tag, task_param, upd_tm, ins_tm) values <foreach collection="list" item="item" separator=","> (#{item.taskName,jdbcType=VARCHAR}, #{item.taskType,jdbcType=TINYINT}, #{item.taskTag,jdbcType=VARCHAR}, #{item.taskParam,jdbcType=VARCHAR}, #{item.taskAttachParam,jdbcType=VARCHAR}, #{item.updTm,jdbcType=TIMESTAMP}, #{item.insTm,jdbcType=TIMESTAMP}) </foreach> </insert> <insert id="batchInsertSelective" keyColumn="id" keyProperty="list.id" parameterType="map" useGeneratedKeys="true"> insert into `file_task` ( <foreach collection="selective" item="column" separator=","> ${column.escapedColumnName} </foreach> ) values <foreach collection="list" item="item" separator=","> ( <foreach collection="selective" item="column" separator=","> <if test="'task_name'.toString() == column.value"> #{item.taskName,jdbcType=VARCHAR} </if> <if test="'task_type'.toString() == column.value"> #{item.taskType,jdbcType=TINYINT} </if> <if test="'task_tag'.toString() == column.value"> #{item.taskTag,jdbcType=VARCHAR} </if> <if test="'task_param'.toString() == column.value"> #{item.taskParam,jdbcType=VARCHAR} </if> <if test="'upd_tm'.toString() == column.value"> #{item.updTm,jdbcType=TIMESTAMP} </if> <if test="'ins_tm'.toString() == column.value"> #{item.insTm,jdbcType=TIMESTAMP} </if> </foreach> ) </foreach> </insert>
batchUpdate
batchUpdate一般分为两种场景:
- 批量更新的内容完全一致
- 批量更新的内容不一致
对于场景1,就是常规的SQL,控制好对应的where条件即可,比如
update test set a = 1 where id in (1,2,3);
而对于场景2,就没办法使用一条SQL语句来更新了。我们需要借助:
- MyBatis的动态sql能力
<update id="batchUpdate" parameterType="java.util.List"> <foreach collection="list" item="item" separator=";"> update test <set> a=#{item.a,jdbcType=BIGINT}, b=#{item.b,jdbcType=VARCHAR}, c=#{item.c,jdbcType=INTEGER} </set> where id = #{item.id,jdbcType=BIGINT} </foreach> </update>
- 开启mysql-connector上的
allowMultiQueries
参数,允许一次性执行多条语句
batchDelete
批量场景也比较单一,一般来说控制好where条件就可以了
batchExecute
batchExecute
和上面列的三种不属于同一个维度。batchExecute
是jdbc
规范里定义的一种执行模式——批量执行模式,各个驱动都要去实现这个标准。使用方式如下:@SneakyThrows public void testBatch() { String connectString = "jdbc:mysql://localhost/test?user=root&password=toor&useLocalSessionState=true&useSSL=false"; Class.forName("com.mysql.jdbc.Driver").newInstance(); try (Connection conn = DriverManager.getConnection(connectString)) { try (PreparedStatement psts = conn.prepareStatement("insert into words (`word`) VALUES(?)")) { for (int i = 0; i < 100000; i++) { psts.setString(1, i + ""); psts.addBatch(); } psts.executeBatch(); } } }
先通过
addBatch
把单个sql添加到待执行列表里,然后再通过executeBatch
一次性批量执行。而最终执行的时候mysql-connector
会把每条sql都用;
拼接在一起。其实和一条手动用;
拼接好的sql直接去执行的效果没有太大的区别。PreparedStatement
如果使用的是
PreparedStatement
,那么只有配置了rewriteBatchedStatements=true
才能有上面说的效果,否则虽然调用的是addBatch
和executeBatch
,但是最后执行的时候还是一条条语句发送。// com.mysql.cj.jdbc.ClientPreparedStatement#executeBatchInternal protected long[] executeBatchInternal() throws SQLException { // 必须开启rewriteBatchedStatements if (!this.batchHasPlainStatements && this.rewriteBatchedStatements.getValue()) { if (((PreparedQuery<?>) this.query).getParseInfo().canRewriteAsMultiValueInsertAtSqlLevel()) { return executeBatchedInserts(batchTimeout); } if (!this.batchHasPlainStatements && this.query.getBatchedArgs() != null && this.query.getBatchedArgs().size() > 3 /* cost of option setting rt-wise */) { return executePreparedBatchAsMultiStatement(batchTimeout); } } // 否则,一条条串行执行 return executeBatchSerially(batchTimeout); }
Statement
而如果使用的是普通的
Statement
,通过配置rewriteBatchedStatements=true
或者allowMultiQueries=true
都可以起到批量的效果。batch模式下的SQL优化
在batch模式下,如果你使用的是
PreparedStatement
,并且配置了rewriteBatchedStatements=true
,驱动还会优化我们的语句。比如:insert优化
insert into t(a) values(10); insert into t(a) values(11); insert into t(a) values(12);
会被优化成
insert into t(a) values(10),(11),(12);
需要注意的是,insert语句的改写,只能将多个values后的值拼接成一整条SQL,insert语句如果有其他差异将无法被改写。例如:
insert into t (a) values (10) on duplicate key update a = 10; insert into t (a) values (11) on duplicate key update a = 11; insert into t (a) values (12) on duplicate key update a = 12;
上述 insert 语句将无法被改写成一条语句。该例子中,如果将SQL改写成如下形式:
insert into t (a) values (10) on duplicate key update a = values(a); insert into t (a) values (11) on duplicate key update a = values(a); insert into t (a) values (12) on duplicate key update a = values(a);
即可满足改写条件,最终被改写成:
insert into t (a) values (10), (11), (12) on duplicate key update a = values(a);
update优化
PreparedStatement
的批量更新时如果超过3条更新语句,则SQL语句会改写为multiple-queries的形式并发送,例如:update t set a = 10 where id = 1; update t set a = 11 where id = 2; update t set a = 12 where id = 3;
而普通的
Statement
,只要配置了allowMultiQueries=true
或者是配置了rewriteBatchedStatements=true
并且批量更新时如果超过4条更新语句,那么批量更新无论多少条语句都会改写成multiple-queries的形式发送。注意:如果你配置了
allowMultiQueries=false&rewriteBatchedStatements=true
,那么ClientPreparedStatement
会在执行过程中再发送一条set allowMultiQueries=true
的命令。所以尽量不要这样配置,会增加一次IO。batch模式下的ServerPreparedStatement
如果你不了解
ClientPreparedStatement
和ServerPreparedStatement
,可以看我之前的文章。在不同的PreparedStatement
模式下,batch模式的表现也不一样。ClientPreparedStatement
和之前介绍的基本一样,而ServerPreparedStatement
则有一些区别:在batch模式下,
ServerPreparedStatement
会发送两次Prepare
命令,第一次是在调用prepareStatement
时。第二次是在调用executeBatch
时,需要预编译拼接后的SQL。即使两次的内容是一样的,并且开启了cachePrepStmts
,也需要调用两次,因为cachePrepStmts
并不作用于此(它作用在Connection
上,控制需不需要创建PreparedStatement
)ServerPreparedStatement
执行过程的抓包信息(两次Prepare)如下图所示:第一次预编译很好理解,而第二次预编译的具体的调用链路如下:
可以看到,在
executeBatch
的时候又创建了一个ServerPreparedStatement
,而它的构造函数里就会向MySQL发送Prepare命令,预编译这条拼接后的SQL。返回值问题
注意:对于批量SQL的返回值指的是影响行数
普通模式下的返回值
multi-value insert的返回值
返回插入的行数
用;
拼接的批量插入
返回第一条语句的插入行数。
虽然抓包每一条语句都会返回执行结果和影响行数。
batchUpdate的返回值
返回更新的记录行数
用;
拼接的multi-queries更新
返回第一条语句的更新行数(命中行数,而不是实际产生更新的行数)。
虽然抓包每一条语句都会返回执行结果和影响行数。
batch模式下的返回值
batchInsert的返回值
这里针对是否rewrite是两种不同的返回值:
- 在不rewrite的情况下,
executeBatch
返回值int[]
里针对每一条语句,返回的就是实际插入的记录数量。
- 而在rewrite的情况下,由于无法拿到每条语句的实际插入数量,索性把每一条语句的更新行数设置成一个固定值(-2),表示成功但是影响行数未知。
而是否rewrite则取决于使用的
Statement
类型以及rewriteBatchedStatements
配置。下面这坨就是使用
PreparedStatement
并且开启了rewriteBatchedStatements
的处理代码。写得真是灾难现场啊,简简单单一段逻辑给写得这么晦涩难懂。想要实现的逻辑是:对batch里的insert语句做rewrite时,按照maxAllowedPacket
做拆分,单次请求的大小不能超过maxAllowedPacket
。// com.mysql.cj.jdbc.ClientPreparedStatement#executeBatchedInserts protected long[] executeBatchedInserts(int batchTimeout) throws SQLException { synchronized (checkClosed().getConnectionMutex()) { String valuesClause = ((PreparedQuery<?>) this.query).getParseInfo().getValuesClause(); JdbcConnection locallyScopedConn = this.connection; if (valuesClause == null) { return executeBatchSerially(batchTimeout); } // 插入语句总数量 int numBatchedArgs = this.query.getBatchedArgs().size(); if (this.retrieveGeneratedKeys) { this.batchedGeneratedKeys = new ArrayList<>(numBatchedArgs); } // 每批次执行的语句数量 int numValuesPerBatch = ((PreparedQuery<?>) this.query).computeBatchSize(numBatchedArgs); // 为什么要写这种逻辑? if (numBatchedArgs < numValuesPerBatch) { numValuesPerBatch = numBatchedArgs; } JdbcPreparedStatement batchedStatement = null; int batchedParamIndex = 1; long updateCountRunningTotal = 0; int numberToExecuteAsMultiValue = 0; int batchCounter = 0; CancelQueryTask timeoutTask = null; SQLException sqlEx = null; long[] updateCounts = new long[numBatchedArgs]; try { try { // 这里这个新创建的PreparedStatement就是按照单批次最大的语句数量来构造的 batchedStatement = /* FIXME -if we ever care about folks proxying our JdbcConnection */ prepareBatchedInsertSQL(locallyScopedConn, numValuesPerBatch); timeoutTask = startQueryTimer(batchedStatement, batchTimeout); // 为什么这里还要重复判断? // numberToExecuteAsMultiValue代表需要执行的次数(取整了,所以还有余数要补) numberToExecuteAsMultiValue = numBatchedArgs < numValuesPerBatch ? numBatchedArgs : numBatchedArgs / numValuesPerBatch; // numberToExecuteAsMultiValue执行次数需要填充的参数数量 int numberArgsToExecute = numberToExecuteAsMultiValue * numValuesPerBatch; // 第一层循环是用来填充参数 for (int i = 0; i < numberArgsToExecute; i++) { // 第二层循环是用来提交batch请求 // 但是注意,坑爹的地方来了,最后一次是没有办法在这里提交的 if (i != 0 && i % numValuesPerBatch == 0) { try { updateCountRunningTotal += batchedStatement.executeLargeUpdate(); } catch (SQLException ex) { sqlEx = handleExceptionForBatch(batchCounter - 1, numValuesPerBatch, updateCounts, ex); } getBatchedGeneratedKeys(batchedStatement); batchedStatement.clearParameters(); batchedParamIndex = 1; } batchedParamIndex = setOneBatchedParameterSet(batchedStatement, batchedParamIndex, this.query.getBatchedArgs().get(batchCounter++)); } // 在这里提交numberToExecuteAsMultiValue的最后一次 try { updateCountRunningTotal += batchedStatement.executeLargeUpdate(); } catch (SQLException ex) { sqlEx = handleExceptionForBatch(batchCounter - 1, numValuesPerBatch, updateCounts, ex); } getBatchedGeneratedKeys(batchedStatement); // 这里计算余数 numValuesPerBatch = numBatchedArgs - batchCounter; } finally { if (batchedStatement != null) { batchedStatement.close(); batchedStatement = null; } } // 这里是处理余数的逻辑 try { if (numValuesPerBatch > 0) { batchedStatement = prepareBatchedInsertSQL(locallyScopedConn, numValuesPerBatch); if (timeoutTask != null) { timeoutTask.setQueryToCancel(batchedStatement); } batchedParamIndex = 1; while (batchCounter < numBatchedArgs) { batchedParamIndex = setOneBatchedParameterSet(batchedStatement, batchedParamIndex, this.query.getBatchedArgs().get(batchCounter++)); } try { updateCountRunningTotal += batchedStatement.executeLargeUpdate(); } catch (SQLException ex) { sqlEx = handleExceptionForBatch(batchCounter - 1, numValuesPerBatch, updateCounts, ex); } getBatchedGeneratedKeys(batchedStatement); } if (sqlEx != null) { throw SQLError.createBatchUpdateException(sqlEx, updateCounts, this.exceptionInterceptor); } // 这里开始处理返回值,插入语句超过1条且执行没有出错的情况下,每一条都会被设置为java.sql.Statement.SUCCESS_NO_INFO,也就是-2 if (numBatchedArgs > 1) { long updCount = updateCountRunningTotal > 0 ? java.sql.Statement.SUCCESS_NO_INFO : 0; for (int j = 0; j < numBatchedArgs; j++) { updateCounts[j] = updCount; } } else { updateCounts[0] = updateCountRunningTotal; } return updateCounts; } finally { if (batchedStatement != null) { batchedStatement.close(); } } } finally { stopQueryTimer(timeoutTask, false, false); resetCancelledState(); } } }
batchUpdate的返回值
批量更新并不会合并语句,所以返回值不会像insert一样出现未知的情况。但是
PreparedStatement
批量更新里考虑了一种特殊场景,就是add到batch里的语句本身就是用;
拼接的多条更新语句。这样在rewrite成multi-queries之后其实看起来就像单条语句rewrite之后的一样,它最终返回的int[]
的数量其实是所有拆分后单条更新语句的影响行数。以上情况仅针对批量update在rewrite场景下。非rewrite场景,那么返回的
int[]
数量不会考虑拆分逻辑,并且只会取;
分割的第一条语句的影响行数。而普通的
Statement
在batch模式下如果添加了用;
拼接的多条更新语句,则在执行时会报错:Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 2 at com.mysql.cj.jdbc.StatementImpl.processMultiCountsAndKeys(StatementImpl.java:1089) at com.mysql.cj.jdbc.StatementImpl.executeBatchUsingMultiQueries(StatementImpl.java:1042) at com.mysql.cj.jdbc.StatementImpl.executeBatchInternal(StatementImpl.java:859) at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:814) at BatchUpdateTest.statementTest(BatchUpdateTest.java:25) at BatchUpdateTest.main(BatchUpdateTest.java:18)
事务问题
批量执行和事务完全是八竿子打不着的两个概念。当然有一种特殊情况,就是batch模式下insert语句被rewrite成一条
multi-value
的insert语句,这样的语句是会保证一起成功/一起失败的。但是不同的批次之间也没有事务保证。而通过
;
连接的语句,到mysql之后都是分开执行的,都不存在事务。但是执行是有顺序的,比如第一条语句执行失败了,后续的语句就不会再执行了。批量插入如何拿到自增主键
我们从MySQL拿插入后的自增主键值的方式有两种:
- 通过JDBC规范提供的
Statement.getGeneratedKeys()
方法
- 通过在同一个Session里执行SQL
SELECT LAST_INSERT_ID()
对于MySQL来说,我们一般都使用第一种,因为能减少一次数据库交互。并且,在批量插入的场景下,第一种方式支持得更好。
下面就以第一种方式为例,看看不同的批量插入场景:
- 通过
;
连接的多个单条插入的语句
- multi-value insert语句
- 通过addBatch串联起来的批量插入语句
通过;
连接的多个单条插入的语句
这种批量插入执行过程抓包如下:
可以看到它一次返回了每一条插入语句的执行结果和对应的自增主键。但是遗憾的是,在取值的时候,只取到了第一个:
public java.sql.ResultSet getGeneratedKeys() throws SQLException { synchronized (checkClosed().getConnectionMutex()) { if (!this.retrieveGeneratedKeys) { throw SQLError.createSQLException(Messages.getString("Statement.GeneratedKeysNotRequested"), MysqlErrorNumbers.SQL_STATE_ILLEGAL_ARGUMENT, getExceptionInterceptor()); } // 批量插入会在执行过程中维护batchedGeneratedKeys if (this.batchedGeneratedKeys == null) { if (this.lastQueryIsOnDupKeyUpdate) { return this.generatedKeysResults = getGeneratedKeysInternal(1); } return this.generatedKeysResults = getGeneratedKeysInternal(); } // 单条execute就会走到下面的逻辑,所以只会读取第一条插入结果 String encoding = this.session.getServerSession().getCharacterSetMetadata(); int collationIndex = this.session.getServerSession().getMetadataCollationIndex(); Field[] fields = new Field[1]; fields[0] = new Field("", "GENERATED_KEY", collationIndex, encoding, MysqlType.BIGINT_UNSIGNED, 20); this.generatedKeysResults = this.resultSetFactory.createFromResultsetRows(ResultSet.CONCUR_READ_ONLY, ResultSet.TYPE_SCROLL_INSENSITIVE, new ResultsetRowsStatic(this.batchedGeneratedKeys, new DefaultColumnDefinition(fields))); return this.generatedKeysResults; } }
multi-value insert语句
没有任何意外,可以按顺序拿到所有的自增主键
通过addBatch串联起来的批量插入语句
没有任何意外,可以按顺序拿到所有的自增主键,但是存在一个特殊场景。
batch模式下的一个特殊场景
如果在batch模式下,使用
Statement
并且存在multi-value的insert语句,那么在执行后通过statement.getGeneratedKeys();
获取自增主键时,可能会丢失。
multi-value
的语句只能返回第一条自增主键。而在使用
PreparedStatement
的情况下,不存在此问题。性能隐患
批量语句的主要性能隐患就是可能会产生超大的SQL。
- 一方面可能导致应用的内存占用升高,甚至溢出
- 另一方面可能造成大事务
所以在使用的过程中必须要注意每批次的大小。
参考
- 作者:黑微狗
- 链接:https://blog.hwgzhu.com/article/mysql-batch-sql
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。
相关文章