@Override
public PageInfo<Entity> queryTest(BasePage queryBo) {
//构建存放并行CompletableFuture实例的集合
ArrayList<CompletableFuture<List<Entity>>> futures = new ArrayList<>();
//业务查询
PageHelper.startPage(queryBo.getPageNum(), queryBo.getPageSize());
List<Entity> list = testMapper.selectTestByCondition(queryBo);
//流处理会导致分页字段失效,提前构建分页
PageInfo<DgRiskStatisticV2VO> pageInfo = new PageInfo<>(list);
//批量查询并行数
int batchSize = 1;
//循环分割集合并开启并行线程处理子业务查询
for (int i = 0; i < list.size(); i = i + batchSize) {
int endIndex = Math.min(i + batchSize, list.size());
List<Entity> subList = list.subList(i, endIndex);
CompletableFuture<List<Entity>> future = CompletableFuture.supplyAsync(() -> performQuery(subList));
futures.add(future);
}
//使用allOf方法等待所有查询完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
//所有查询完毕,合并所有并行线程结果
CompletableFuture<List<Entity>> finalFutures = allFutures.thenApply(v -> futures.stream().map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList()));
//取出所有查询结果并返回
List<Entity> result = finalFutures.join();
pageInfo.setList(result);
return pageInfo;
}
/**
* 并行子查询
*
* @param parentList
* @return newList
*/
private List<Entity> performQuery(List<Entity> parentList) {
parentList.forEach(x -> {
//子业务处理或查询
...
});
return newList;
}