Spring Boot 使用 ThreadPoolTaskExecutor 批量插入大量数据
在业务系统中,经常会遇到批量导入数据的场景。
例如:
Excel 批量导入;
日志数据入库;
第三方数据同步;
历史数据迁移;
大批量初始化数据。
如果数据量只有几千条,单线程批量插入通常就够了。
但如果数据量达到几十万、上百万级,单线程处理耗时会明显增加。这个时候,可以考虑使用线程池对数据进行分片,然后并发批量插入。
记录一种基于 Spring Boot、MyBatis-Plus 和 ThreadPoolTaskExecutor 的批量插入实现方式。
一、实现思路
整体思路比较简单:
1. 准备待插入的数据集合
2. 按固定大小切分成多个子集合
3. 每个子集合交给一个异步任务处理
4. 使用 ThreadPoolTaskExecutor 执行异步任务
5. 使用 CountDownLatch 等待所有任务执行完成
6. 最后返回处理结果
例如有 200 万条数据,可以按每 100 条一组进行切分。
每一组数据通过一个异步任务执行批量插入。
主线程负责提交任务,并等待所有任务执行完成。
二、线程池配置
先在配置文件中添加线程池参数。
# 异步线程配置
# 核心线程数
async.executor.thread.core_pool_size=30
# 最大线程数
async.executor.thread.max_pool_size=30
# 队列大小
async.executor.thread.queue_capacity=99988
# 线程名称前缀
async.executor.thread.name.prefix=async-importDB-
这里配置了:
核心线程数:30;
最大线程数:30;
队列容量:99988;
线程名前缀:
async-importDB-。
实际项目中,这些参数不要直接照抄,需要结合机器配置、数据库连接池大小、数据库承载能力和单批数据大小进行调整。
三、注册 ThreadPoolTaskExecutor
接下来创建线程池配置类。
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
log.info("init asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix(namePrefix);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
这里使用了:
@EnableAsync
开启 Spring 异步能力。
然后通过:
@Bean(name = "asyncServiceExecutor")
注册一个指定名称的线程池。
后续业务方法可以通过:
@Async("asyncServiceExecutor")
指定使用这个线程池执行异步任务。
四、创建异步插入任务
定义异步任务方法,用来执行单个分片的数据插入。
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@Service
@Slf4j
public class AsyncServiceImpl implements AsyncService {
@Override
@Async("asyncServiceExecutor")
public void executeAsync(List<LogOutputResult> logOutputResults,
LogOutputResultMapper logOutputResultMapper,
CountDownLatch countDownLatch) {
try {
log.info("start executeAsync, size={}", logOutputResults.size());
logOutputResultMapper.addLogOutputResultBatch(logOutputResults);
log.info("end executeAsync, size={}", logOutputResults.size());
} finally {
countDownLatch.countDown();
}
}
}
这里有一个关键点:
finally {
countDownLatch.countDown();
}
countDown() 必须放在 finally 中。
否则某个异步任务执行异常后,如果没有调用 countDown(),主线程会一直阻塞在:
countDownLatch.await();
最终导致接口或任务无法结束。
五、批量插入主流程
主流程中先准备数据,然后切分集合,再提交异步任务。
@Override
public int testMultiThread() {
List<LogOutputResult> logOutputResults = getTestData();
// 每 100 条数据切分为一组
List<List<LogOutputResult>> lists = ConvertHandler.splitList(logOutputResults, 100);
CountDownLatch countDownLatch = new CountDownLatch(lists.size());
for (List<LogOutputResult> listSub : lists) {
asyncService.executeAsync(listSub, logOutputResultMapper, countDownLatch);
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("批量插入等待中断", e);
}
return logOutputResults.size();
}
这里的关键步骤是:
List<List<LogOutputResult>> lists = ConvertHandler.splitList(logOutputResults, 100);
将大集合切成多个小集合。
然后每个小集合提交给异步线程池:
asyncService.executeAsync(listSub, logOutputResultMapper, countDownLatch);
主线程最后通过:
countDownLatch.await();
等待所有异步任务执行完成。
六、集合切分示例
如果项目中没有现成的集合切分工具,可以自己写一个简单方法。
import java.util.ArrayList;
import java.util.List;
public class ConvertHandler {
public static <T> List<List<T>> splitList(List<T> source, int batchSize) {
List<List<T>> result = new ArrayList<>();
if (source == null || source.isEmpty()) {
return result;
}
if (batchSize <= 0) {
throw new IllegalArgumentException("batchSize must be greater than 0");
}
int size = source.size();
for (int i = 0; i < size; i += batchSize) {
int end = Math.min(i + batchSize, size);
result.add(source.subList(i, end));
}
return result;
}
}
比如总共有 2000003 条数据,每 100 条一组,大约会切成 20001 个任务。
七、Mapper 批量插入示例
Mapper 层可以使用 MyBatis 的 foreach 批量插入。
示例:
<insert id="addLogOutputResultBatch">
INSERT INTO log_output_result (
id,
content,
create_time
)
VALUES
<foreach collection="list" item="item" separator=",">
(
#{item.id},
#{item.content},
#{item.createTime}
)
</foreach>
</insert>
对应 Mapper 接口:
import org.apache.ibatis.annotations.Param;
import java.util.List;
public interface LogOutputResultMapper {
int addLogOutputResultBatch(@Param("list") List<LogOutputResult> list);
}
实际项目中,字段需要按自己的表结构调整。
八、测试结果记录
测试数据量:
2000003 条
测试方式:
每 100 条数据为一批
使用 30 个线程并发插入
测试结果:
从这组测试结果看,多线程批量插入相比单线程有明显提升。
不过这个结果只代表当时测试环境下的表现。
实际性能会受到很多因素影响,例如:
机器 CPU;
内存;
数据库连接池大小;
数据库服务器性能;
网络延迟;
表索引数量;
单批次数据大小;
是否开启事务;
数据库写入压力。
因此,线程数和批次大小不能固定照搬,需要结合具体环境压测。
九、数据正确性检查
批量并发插入后,需要检查两个问题:
1. 是否重复插入
可以按主键或业务唯一键分组检查。
示例:
SELECT id, COUNT(*)
FROM log_output_result
GROUP BY id
HAVING COUNT(*) > 1;
如果查询结果为空,说明没有发现重复主键数据。
如果使用的是业务唯一键,也可以按业务唯一键检查。
2. 数据是否完整
可以检查总数量:
SELECT COUNT(*)
FROM log_output_result;
确认入库数量是否和原始数据量一致。
例如原始数据是:
2000003 条
那么最终数据库中的数量也应该是:
2000003 条
十、线程数不是越多越好
多线程确实可以提升插入效率,但线程数不是越多越好。
线程数过多可能导致:
CPU 上下文切换增加;
数据库连接不够用;
数据库写入压力过大;
锁竞争变严重;
事务提交压力变大;
应用内存占用上升。
有一种常见估算方式:
CPU 核心数 * 2 + 2
但这只能作为初始参考。
对于批量入库这种场景,更重要的是结合数据库连接池和数据库写入能力调整。
例如:
如果数据库连接池最大连接数只有 20,线程池开 100 个线程意义不大;
如果单批数据太小,任务数量过多,线程调度成本会上升;
如果单批数据太大,单条 SQL 过长,也可能影响数据库执行效率。
比较稳妥的做法是从较小线程数开始压测,例如:
5
10
20
30
同时观察:
应用 CPU;
JVM 内存;
数据库连接数;
数据库 CPU;
慢 SQL;
插入耗时。
最终选择一个整体吞吐和稳定性都比较好的配置。
十一、需要注意的几个问题
1. 注意数据库连接池大小
线程池并发执行插入任务时,每个任务都可能占用数据库连接。
如果线程池线程数大于数据库连接池最大连接数,任务可能会阻塞等待连接。
所以线程池大小要和数据库连接池配置一起看。
2. 注意事务边界
如果每个异步任务单独执行批量插入,那么每个任务通常是独立事务。
如果希望所有数据要么全部成功,要么全部失败,这种多线程拆分方式就不适合直接使用。
因为跨多个线程统一控制一个大事务比较复杂,也不推荐这么做。
这类场景更适合:
先落临时表;
再做校验;
最后通过单事务合并;
或者设计补偿和重试机制。
3. 注意异常收集
示例中 CountDownLatch 只负责等待任务完成,不负责收集异常。
如果异步任务中插入失败,主线程不一定能直接感知到具体错误。
可以增加一个线程安全集合记录异常。
示例:
List<Throwable> errors = Collections.synchronizedList(new ArrayList<>());
异步任务中捕获异常后写入:
try {
logOutputResultMapper.addLogOutputResultBatch(logOutputResults);
} catch (Exception e) {
errors.add(e);
log.error("批量插入失败", e);
} finally {
countDownLatch.countDown();
}
主线程等待结束后判断:
if (!errors.isEmpty()) {
throw new RuntimeException("部分批量任务执行失败");
}
4. 注意批次大小
示例中每 100 条一批。
这个值不是固定答案。
批次太小,任务数量太多,线程调度和 SQL 执行次数会增加。
批次太大,单条 SQL 过长,可能导致数据库压力过大。
需要结合实际环境调整,例如:
100
500
1000
2000
逐步压测。
5. 注意幂等性
批量插入过程中,如果某些任务成功、某些任务失败,再次重试时可能产生重复数据。
因此批量导入场景最好有幂等设计,例如:
使用业务唯一键;
使用唯一索引;
插入前清理批次数据;
使用导入批次号;
失败后可按批次回滚或重试。
十二、完整流程回顾
整个流程可以概括为:
1. 配置 ThreadPoolTaskExecutor
2. 使用 @Async 指定线程池
3. 准备待插入数据
4. 将大集合按 batchSize 切分
5. 每个分片提交一个异步任务
6. 每个异步任务执行批量插入
7. CountDownLatch 等待所有任务完成
8. 检查数据总量和重复数据
9. 根据压测结果调整线程数和批次大小
结论
使用 ThreadPoolTaskExecutor 可以提升大量数据批量插入的处理效率。
在测试中,2000003 条数据:
单线程耗时约 4.34 分钟;
30 个线程并发插入耗时约 1.1 分钟。
不过,多线程批量插入不是简单地把线程数调大。
真正需要关注的是:
线程池大小;
数据库连接池大小;
单批次数据量;
数据库写入能力;
异常处理;
事务边界;
数据幂等性;
压测结果。