在业务开发中,经常会遇到批量处理数据的场景。
例如:
批量导入 Excel 数据;
批量调用第三方接口;
批量处理订单;
批量同步用户数据;
大列表分片计算;
批量生成报表数据。
如果直接用单线程遍历整个列表,数据量较大时处理时间会比较长。
一种常见做法是:
将大列表拆分成多个小列表,每个小列表交给线程池异步处理,最后再汇总所有子任务结果。
本文记录一种基于 Spring Boot、ThreadPoolTaskExecutor、CompletableFuture 和 Function<List<T>, ProcessResult> 的实现方式。
一、实现思路
整体流程如下:
1. 准备一个大列表
2. 按指定 chunkSize 拆分成多个子列表
3. 每个子列表交给 Function 处理
4. 使用异步线程池并发执行子任务
5. 每个子任务返回 ProcessResult
6. 主线程等待所有任务完成
7. 汇总所有子任务结果这样可以把“数据拆分、异步调度、业务处理、结果汇总”拆开。
其中:
列表拆分逻辑由
ListProcessor负责;子任务处理逻辑由调用方通过
Function传入;线程池由 Spring 管理;
子任务结果由
ProcessResult封装。
二、结果封装类 ProcessResult
先定义一个结果对象,用于保存每个子任务的处理结果。
public class ProcessResult {
private int sum;
private StringBuilder msg;
public ProcessResult(int sum, StringBuilder msg) {
this.sum = sum;
this.msg = msg;
}
public int getSum() {
return sum;
}
public StringBuilder getMsg() {
return msg;
}
}这里示例中保存了两个字段:
sum:用于统计子任务的计算结果;msg:用于保存子任务处理过程中的消息。
实际业务中可以根据需要调整。
例如可以改成:
private int successCount;
private int failCount;
private List<String> errorMessages;
也可以封装成更通用的业务结果对象。
三、配置独立线程池
为了避免和项目中其他异步任务互相影响,建议单独配置一个线程池。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
public class ThreadPoolConfig {
@Bean(name = "customThreadPool")
public Executor customThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
executor.setThreadNamePrefix("custom-thread-");
executor.initialize();
return executor;
}
}
这里配置了:
核心线程数:5
最大线程数:10
队列容量:25
线程名前缀:custom-thread-
实际项目中不建议把这些参数写死在代码中。
更推荐放到配置文件里,例如:
async:
task:
core-pool-size: 5
max-pool-size: 10
queue-capacity: 25
thread-name-prefix: custom-thread-
线程池参数需要结合业务处理耗时、CPU、数据库连接池、第三方接口限流等因素综合调整。
四、列表处理器 ListProcessor
核心处理类负责三件事:
拆分列表;
提交异步任务;
收集任务结果。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
@Component
public class ListProcessor<T> {
@Autowired
private ApplicationContext applicationContext;
/**
* 异步处理主方法。
* 负责拆分列表、异步调用处理方法,并收集处理结果。
*/
public List<ProcessResult> processList(List<T> list,
int chunkSize,
Function<List<T>, ProcessResult> function)
throws InterruptedException, ExecutionException {
List<List<T>> chunks = splitList(list, chunkSize);
List<CompletableFuture<ProcessResult>> futures = new ArrayList<>();
ListProcessor<T> self = applicationContext.getBean(this.getClass());
for (List<T> chunk : chunks) {
futures.add(self.processChunkAsync(chunk, function));
}
List<ProcessResult> results = new ArrayList<>();
for (CompletableFuture<ProcessResult> future : futures) {
results.add(future.get());
}
return results;
}
/**
* 异步处理单个子列表。
*/
@Async("customThreadPool")
public CompletableFuture<ProcessResult> processChunkAsync(List<T> chunk,
Function<List<T>, ProcessResult> function) {
return CompletableFuture.completedFuture(function.apply(chunk));
}
/**
* 按指定大小拆分列表。
*/
private List<List<T>> splitList(List<T> list, int chunkSize) {
List<List<T>> chunks = new ArrayList<>();
for (int i = 0; i < list.size(); i += chunkSize) {
chunks.add(list.subList(i, Math.min(i + chunkSize, list.size())));
}
return chunks;
}
}
这里有一个比较关键的写法:
ListProcessor<T> self = applicationContext.getBean(this.getClass());
因为 @Async 是基于 Spring AOP 代理实现的。
如果在同一个类中直接调用:
this.processChunkAsync(...)
不会经过 Spring 代理,@Async 不会生效。
所以这里通过 Spring 容器拿到当前 Bean 的代理对象,再调用异步方法。
五、为什么用 Function
Function<List<T>, ProcessResult> 的作用是把业务处理逻辑交给调用方。
列表处理器只关心:
如何拆分;
如何异步执行;
如何收集结果。
它不关心每个子列表具体怎么处理。
例如:
Function<List<Integer>, ProcessResult> function = chunk -> {
int sum = 0;
StringBuilder msg = new StringBuilder();
for (int num : chunk) {
sum += num;
msg.append(num).append(" ");
}
return new ProcessResult(sum, msg);
};
这样 ListProcessor 就可以复用于不同业务场景。
例如:
批量计算
批量入库
批量调用接口
批量校验数据
批量生成结果
调用方只需要传入不同的 Function 即可。
六、主程序测试示例
启用异步能力:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication
@EnableAsync
public class MainApplication {
public static void main(String[] args) {
SpringApplication.run(MainApplication.class, args);
}
}测试逻辑可以写在 CommandLineRunner 中。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.StringJoiner;
import java.util.function.Function;
@Component
public class TestRunner implements CommandLineRunner {
@Autowired
private ListProcessor<Integer> listProcessor;
@Override
public void run(String... args) throws Exception {
List<Integer> list = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
list.add(i);
}
int chunkSize = 3;
Function<List<Integer>, ProcessResult> function = chunk -> {
int sum = 0;
StringBuilder msg = new StringBuilder();
for (int num : chunk) {
sum += num;
msg.append(num).append(" ");
}
return new ProcessResult(sum, msg);
};
List<ProcessResult> results = listProcessor.processList(list, chunkSize, function);
int totalSum = 0;
StringJoiner joiner = new StringJoiner(", ");
for (ProcessResult result : results) {
totalSum += result.getSum();
joiner.add(result.getMsg().toString());
}
System.out.println("sum = " + totalSum);
System.out.println("msg = " + joiner);
}
}列表:
1,2,3,4,5,6,7,8,9,10按照每 3 个元素拆分后:
[1, 2, 3]
[4, 5, 6]
[7, 8, 9]
[10]每个子列表单独处理,最后汇总结果。
七、不需要返回值时的处理方式
有些场景不需要子任务返回结果。
例如:
批量发送消息;
批量写日志;
批量更新状态;
批量调用没有返回值的接口。
这种情况下,可以使用 Consumer<List<T>> 和 CountDownLatch。
public <T> void processListWithoutResult(List<T> list,
int chunkSize,
Consumer<List<T>> consumer)
throws InterruptedException {
if (list == null || list.isEmpty()) {
return;
}
if (list.size() <= chunkSize) {
consumer.accept(list);
return;
}
List<List<T>> chunks = splitList(list, chunkSize);
CountDownLatch latch = new CountDownLatch(chunks.size());
ListProcessor<T> self = applicationContext.getBean(this.getClass());
for (List<T> chunk : chunks) {
self.processChunkAsync(chunk, consumer, latch);
}
latch.await();
}异步执行方法:
@Async("customThreadPool")
public <T> void processChunkAsync(List<T> chunk,
Consumer<List<T>> consumer,
CountDownLatch latch) {
try {
consumer.accept(chunk);
} finally {
latch.countDown();
}
}这里必须把:
latch.countDown();放在 finally 中。
否则某个子任务异常时,主线程可能一直阻塞在:
latch.await();八、列表拆分方法
列表拆分是整个流程的基础。
private List<List<T>> splitList(List<T> list, int chunkSize) {
List<List<T>> chunks = new ArrayList<>();
for (int i = 0; i < list.size(); i += chunkSize) {
chunks.add(list.subList(i, Math.min(i + chunkSize, list.size())));
}
return chunks;
}
这里使用:
Math.min(i + chunkSize, list.size())
是为了处理最后一个子列表不足 chunkSize 的情况。
例如列表长度是 10,chunkSize 是 3:
[1, 2, 3]
[4, 5, 6]
[7, 8, 9]
[10]
最后一个子列表只有 1 个元素。
九、需要注意的问题
1. chunkSize 要合理设置
chunkSize 太小,会产生大量异步任务,线程调度成本变高。
chunkSize 太大,每个任务处理时间变长,并发效果下降。
需要结合实际业务压测。
例如:
100
500
1000
2000
都可以作为初始测试值。
2. 线程池大小不要盲目调大
线程池大小需要结合任务类型判断。
如果任务主要是 CPU 计算,线程数不宜远大于 CPU 核数。
如果任务主要是 IO,例如数据库、网络接口,可以适当增加线程数,但也要考虑:
数据库连接池大小;
第三方接口限流;
JVM 内存;
下游服务承载能力。
3. future.get() 会阻塞
在结果汇总阶段:
future.get()会阻塞等待子任务完成。
如果子任务异常,get() 会抛出 ExecutionException。
实际项目中建议对异常进行统一处理。
例如:
try {
results.add(future.get());
} catch (ExecutionException e) {
// 记录失败任务
}4. subList 是原列表视图
list.subList() 返回的是原列表的一段视图,不是完全独立的新列表。
如果后续原列表会被修改,可能引发问题。
更稳妥的写法是复制一份:
chunks.add(new ArrayList<>(list.subList(i, Math.min(i + chunkSize, list.size()))));如果只是只读处理,直接使用 subList() 问题不大。
5. @Async 同类方法调用不会生效
如果在同一个类里直接调用异步方法:
this.processChunkAsync(...)不会触发异步。
需要通过 Spring 代理对象调用。
本文示例中使用:
applicationContext.getBean(this.getClass())来获取当前 Bean 代理对象。
也可以把异步方法拆到另一个 Bean 中,这样结构更清晰。
十、适用场景
这种方式适合:
大列表拆分处理;
子任务之间互不依赖;
每个子任务可以独立执行;
最终需要汇总处理结果;
业务逻辑希望通过函数式接口传入。
例如:
批量校验 Excel 数据
批量插入数据库
批量调用远程接口
批量生成统计结果
批量处理订单状态十一、不适合的场景
这种方式不适合:
子任务之间有强依赖关系;
所有子任务必须在同一个数据库事务中完成;
任务执行顺序必须严格保证;
子任务数量巨大但没有限流控制;
下游系统无法承受并发压力。
特别是数据库操作场景,需要额外注意事务边界。
如果多个子任务分别在线程池中执行,它们通常不在同一个事务中。
如果业务要求“全部成功或全部失败”,就不能简单用这种方式拆分并发执行。
十二、可以改进的地方
1. 使用 allOf 等待所有任务
除了逐个 future.get(),也可以使用:
CompletableFuture.allOf(...)例如:
CompletableFuture<Void> allFuture = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
allFuture.join();
List<ProcessResult> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());这种方式更适合统一等待所有任务完成。
2. 增加异常收集
可以定义一个更完整的结果对象,例如:
public class ProcessResult {
private int successCount;
private int failCount;
private List<String> errorMessages;
}这样更适合批量任务处理。
3. 支持超时控制
如果某些子任务可能执行很久,可以增加超时控制,避免主线程无限等待。
例如使用:
future.get(30, TimeUnit.SECONDS)或者在 CompletableFuture 链路中配置超时策略。
结论
本文记录了一种在 Spring Boot 中处理大列表拆分任务的方式。
核心思路是:
使用
splitList将大列表拆成多个小列表;使用
Function<List<T>, ProcessResult>抽象子任务处理逻辑;使用
@Async和独立线程池并发执行子任务;使用
CompletableFuture获取子任务结果;主线程汇总所有子任务结果。
这种方式的优点是:
拆分逻辑和业务逻辑分离;
子任务处理逻辑可以通过
Function灵活传入;可以并发处理大列表;
结果可以统一汇总;
代码结构比较清晰。
当一个大列表可以被拆分成多个互不依赖的小任务时,可以使用 Function + @Async + CompletableFuture 实现并发处理和结果汇总;但需要注意线程池大小、异常处理、事务边界和下游系统承载能力。